> But IMO it is the best available solution towards the support of
partition expansion in comparison to alternative, no?

At this time, relative to the other alternatives you have listed, this is a
path of least effort to solving this problem. I agree to that. :)

> I can merge those two sections or update the statement if the current 
> statement
has not clearly explained the reason of partition expansion in Kafka.

Given the significance of what you are actually trying to solve, I think it
will be better to have it in points. Let me come find you and we can update
it.

> I have updated wiki and added the task expansion to the Future Work section.
On the other hand I still keep it in the Rejected Alternative section to
explain why this future work does not replace the existing proposal in
SEP-5. Does this sound reasonable?

It is very confusing to me how the same point can be under "Future Work"
and "Rejected Alternative". There is no question about the future work
*replacing* SEP-5. Iiuc, this SEP is a subset for the partition expansion
solution. So, I don't think increasing task count should be a rejected
alternative.

> I am also not sure why a feature needs to be "utmost priority" in order
to be accepted. Can you explain a bit on that?

I don't think I ever claimed that the feature needs to be of "utmost
priority" to be accepted. I was just stating my opinion.


Thanks!
Navina

On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin <lindon...@gmail.com> wrote:

> Thanks much for the reply Navina. Please see my reply inline.
>
> On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) <nav...@apache.org
> >
> wrote:
>
> > Thanks to Jake, Dong and Kartik for keeping the discussion going.
> >
> > > Here are the pros and cons of the extra re-partitioning stage in
> > comparison
> > to SEP-5.
> >
> > I think that is good summarization of pros/cons for the repartitioning
> > stage based solution. Can you please include it in your SEP? It seems
> like
> > you already have access. If you are still unable to access the wiki page,
> > feel free to walk over to Samza area and find me!
> >
>
> Sure. I have added this summary to the Alternative Section.
>
>
> >
> > > I think there is always a way for user to mess up their job if they
> > configure the Samza job incorrectly.
> >
> > I don't think Jake or anyone is arguing about an "incorrectly" configured
> > Samza job. The question was towards how easy/difficult it is for users to
> > *not mess* up their job with incorrect configurations.
> >
> > > I also think the assumption made in this SEP is not particularly harder
> > to understand than other existing configs in Samza.
> >
> > I disagree here. Other configs don't require you understand more than one
> > assumption.
> >
> > There is already an overload of configs in Samza and I think we are
> trying
> > to shield it as much as possible from the users (esp. with fluent api).
> > More specifically, we don't want the user to know about the internals of
> > Samza such ssp grouper, taskname grouper etc. Since the proposed solution
> > makes the configuration more complex to understand, it *is a* burden on
> the
> > user.
> >
> > Just because configs are the way it is, it doesn't mean we increase the
> > complexity of it and push the burden on users to manage it correctly. My
> > two cents.
> >
>
> Sure, I agree the proposal requires user to understand the assumption in
> order to expand the partition of the topic. But it is very subjective as to
> whether the added complexity is acceptable or not. If there is better way
> to allow user to expand partition of the input stream without making
> assumption, then we can just do that. The current solution is not perfect.
> But IMO it is the best available solution towards the support of partition
> expansion in comparison to alternative, no?
>
>
> > Here are a few things that I believe are needed for wrapping up the SEP:
> >
> > 1. For the longest time, I thought partition expansion happens in Kafka
> > only when the volume of messages across partitions is too high. Based on
> > this assumption, I would only assume that re-mapping expanded partitions
> to
> > the same task will have adverse effect on the throughput/resource
> > utilization of the processor/container in Samza (for example, disk
> > utilization may increase significantly. With disk quota throttling, it
> > could cause the processor to drop.). However, after speaking with Xinyu,
> it
> > turns out that partition expansion also happens when there is a
> > per-partition data retention limit imposed by Kafka (not sure if it is
> only
> > in LinkedIn or in Kafka open-source as well). Imo, this is the primary
> > use-case that we are trying to solve for in Samza and it is not very
> > obvious from the SEP.
> > @Dong, can you please explain *the circumstances under which partition
> > expansion can happen*, under "Motivation" section?  I disagree to the
> > current motivation described as -> "This design doc provides a solution
> to
> > increase partition number of the input streams of a stateful Samza job
> > while still ensuring the correctness of Samze job output. "
> > This is a solution, albeit not fully done through this SEP alone.
> >
>
> This is actually already described in the Problem and Goal section, i.e.
> "For example, Kafka generally needs to limit the maximum size of each
> partition to scale up its performance. Thus the number of partitions of a
> Kafka topic needs to be expanded to reduce the partition size if the
> average byte-in-rate or retention time of the Kafka topic has doubled". I
> can merge those two sections or update the statement if the current
> statement has not clearly explained the reason of partition expansion in
> Kafka.
>
>
> >
> > 2. I think we are in consensus about the fact that increasing the task
> > number and handling the state correctly is a good solution for Samza in
> the
> > long-run. In your rejected alternatives, you mention "However, this
> feature
> > alone does not solve the problem of allowing partition expansion.". What
> > else is required to allow partition expansion? Can you please elaborate
> on
> > that in point #1 of the rejected alternatives? If there is still more
> work
> > to be done to support partition expansion in Samza, it is worthwhile to
> > mention it under *Future Work*, instead of under "Rejected Alternatives".
> > Perhaps you were waiting for edit permissions to the wiki. Please make
> this
> > change so it is well-tracked.
> >
>
> I thought this is already explained in the rejected alternative section.
> More specifically, it is said that "However, this feature alone does not
> solve the problem of allowing partition expansion. For example, say we have
> a job that joins two streams both of which have 3 partitions. If partition
> number of one stream increases from 3 to 6, we would still want the task
> number to remain 3 to make sure that messages with the same key from both
> streams will be handled by the same task. This needs to be done with the
> new grouper classes proposed in this doc."
>
> Does this explanation make sense?
>
> I have updated wiki and added the task expansion to the Future Work
> section. On the other hand I still keep it in the Rejected Alternative
> section to explain why this future work does not replace the existing
> proposal in SEP-5. Does this sound reasonable?
>
>
> > I am still not totally crazy about the proposed solution because it is
> not
> > clear for open-source, who or which use-cases stand to benefit. I am not
> > convinced that this problem is of utmost priority for the Samza community
> > *at this point of time*.
> >
>
> I think the Problem and Goal section and the Motivation section have
> illustrated the use-case for this feature. Let me answer your questions
> more specifically:
>
> *Who will benefit from this feature:* any Samza user who runs stateful job
> with input from Kafka and needs to expand partition of the input stream so
> that the single partition size doesn't exceed a threshold.
>
> *Which use-case stand to benefit:* this SEP-5 is useful if user runs
> stateful job with input from Kafka and needs to expand partition of the
> input stream so that the single partition size doesn't exceed a threshold.
>
> *Why it is a important feature:* a user needs this feature if he runs
> stateful job with input from Kafka and the partition size of Kafka has
> become too large due to increase in throughput or increase in retention
> time.
>
> I am not sure what kind of feature can be classified at "utmost priority".
> I am also not sure why a feature needs to be "utmost priority" in order to
> be accepted. Can you explain a bit on that? I think we should develop
> feature that has a valid use-case.
>
>
> > I am on the same page as Jake on this one. Not a +1, just a 0 (if that
> even
> > matters).
> >
> > Thanks!
> > Navina
> >
> > On Sun, Jun 18, 2017 at 12:04 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > BTW, I will update the SEP-5 wiki with our latest discussion after I
> have
> > > got the wiki edit access.
> > >
> > > On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Thanks everyone for the comment!
> > > >
> > > > I am currently leaning towards the current approach. I think Kartik
> > > raised
> > > > a good point that the extra repartitoning stage will also incur
> > > additional
> > > > throughput on Kafka in addition to the potential storage cost. Any
> > other
> > > > Samza developers also chime in and provide your opinions on this
> > > proposal?
> > > >
> > > > Since this discussion thread has been open for three weeks, I will
> > > > initiate voting thread on Monday if there is no major revision
> > > suggestion.
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > >
> > > > On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
> > > > kparamasi...@linkedin.com.invalid> wrote:
> > > >
> > > >> Great discussion !
> > > >>
> > > >> Here are some more thoughts
> > > >>
> > > >> The point that repartitioning is a more general purpose solution is
> > > surely
> > > >> spot on.  For many source systems (Kinesis, Google Pub-Sub, any of
> the
> > > >> older queuing systems (rabbitMQ etc. etc.), repartitioning is
> anyways
> > > >> functionally required to do even simple keyed aggregations.   But in
> > > most
> > > >> of these systems, the concept of repartitioning either does not
> exist
> > or
> > > >> exists in a way which is very unique (e.g. Kinesis).
> > > >>
> > > >> I think this feature is really only interesting for source systems
> > like
> > > >> Kafka and EventHub.  EventHub (last I checked) didn't support
> > > >> repartitioning. So this is probably not super-interesting (yet) for
> > > >> EventHub.
> > > >>
> > > >> So Kafka is clearly the main use case here.
> > > >>
> > > >> For Kafka, I think it is pretty rare for people to customize the
> > hashing
> > > >> algorithm for sending messages.  I would argue that less than 5% of
> > the
> > > >> population (i am being generous ;)) would do that.   The current
> > > proposal
> > > >> works with the default hashing scheme for Kafka.  So organizations
> > will
> > > >> typically never have to coordinate.
> > > >>
> > > >> If the proposed alternative (always repartition) was side-effect
> free,
> > > >> then
> > > >> it would make sense to use an alternative design that would work for
> > > 100%
> > > >> of the population.    Repartitioning all input would however not be
> a
> > > >> feasible solution (atleast at LinkedIn) as it would double the kafka
> > > >> workload.    If many samza jobs read from kafka topics, then the
> > > increase
> > > >> would be a function of the number of samza jobs.
> > > >>
> > > >> For low throughput kafka topics, surely explicit repartitioning
> using
> > > >> fluent api is feasible.
> > > >>
> > > >> If the proposal was to make this new policy the default then that
> > would
> > > >> clearly not make much sense.
> > > >>
> > > >> But it is an opt in policy.  If it is not applicable, people don't
> > have
> > > to
> > > >> use it.
> > > >>
> > > >> I do have some questions about the implementation. I will try to
> > respond
> > > >> back after spending some more time on this.
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes <jacob.m...@gmail.com>
> > > wrote:
> > > >>
> > > >> > Thanks, Dong.
> > > >> >
> > > >> > The summary looks accurate.
> > > >> >
> > > >> > I'll let the others chime in, as I believe my perspective has been
> > > >> > adequately captured in this thread.
> > > >> >
> > > >> > -Jake
> > > >> >
> > > >> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > >> >
> > > >> > > Hey Jacob,
> > > >> > >
> > > >> > > Thank you for taking so much time to discuss with me! I
> appreciate
> > > the
> > > >> > > discussion and the insight. I will summarize our discussion
> below.
> > > >> > >
> > > >> > > 1) Whether it is reasonable to store partition-to-task mapping.
> > > >> > >
> > > >> > > We agree that this partition-to-task mapping will be reasonable
> if
> > > we
> > > >> > allow
> > > >> > > user to specify either the new-partition-to-old-partition
> mapping
> > or
> > > >> > > key-to-partition mapping in the future. SEP-5 doesn't currently
> > > >> provide a
> > > >> > > way for user to specify new-partition-to-old partition mapping
> > > >> because we
> > > >> > > don't have a good idea about that interface until we try to
> enable
> > > >> > > partition expansion for input system other than Kafka in the
> > future.
> > > >> This
> > > >> > > is currently specified as the third future work in SEP-5.
> > > >> > >
> > > >> > > And if we decide to implement SEP-5, I will include a warning
> > > message
> > > >> > > regarding the use of partition-to-task, i.e. "this does not
> > specify
> > > >> the
> > > >> > > key-to-task mapping". We agree that this could address the
> concern
> > > >> here.
> > > >> > >
> > > >> > > 2) Whether we should follow the approach in SEP-5 or use an
> extra
> > > >> > > re-partitioning stage in the stateful Samza job to enable
> > partition
> > > >> > > expansion.
> > > >> > >
> > > >> > > Here are the pros and cons of the extra re-partitioning stage in
> > > >> > comparison
> > > >> > > to SEP-5.
> > > >> > >
> > > >> > > Pros:
> > > >> > > - It doesn't require owner of the Samza job to know the
> > partitioning
> > > >> > > algorithm of used for the input stream. If the owner of the
> Samza
> > > job
> > > >> is
> > > >> > in
> > > >> > > a different organization than the producer of the input stream,
> > this
> > > >> > > solution frees different organizations from having to coordinate
> > > with
> > > >> > each
> > > >> > > other.
> > > >> > > - It doesn't require owner of the Samza job to specify the
> > > >> partitioning
> > > >> > > algorithm of used for the input stream. Thus less config.
> > > >> > >
> > > >> > > Cons:
> > > >> > > - User has to make code change on their side to use the new
> fluent
> > > >> API.
> > > >> > > - The extra partitioning stage would potentially increases
> > latency.
> > > >> > > - The extra partitioning stage would incur additional cost due
> to
> > > the
> > > >> > extra
> > > >> > > internal topic. The cost is probably not that much with the new
> > > trim()
> > > >> > API
> > > >> > > in Kafka if Samza uses Kafka to store the internal topic. But
> the
> > > cost
> > > >> > may
> > > >> > > be doubled if Samza uses another input system that doesn't
> provide
> > > >> trim()
> > > >> > > API to delete data on demand.
> > > >> > >
> > > >> > > My recommendation is to adopt a hybrid solution, i.e. we still
> > > >> implement
> > > >> > > the current proposal in SEP-5 so that we enable partition
> > expansion
> > > >> > without
> > > >> > > incurring extra latency/cost and without requiring users to
> change
> > > >> their
> > > >> > > code. And we can recommend user to use the extra partitioning
> > stage
> > > if
> > > >> > the
> > > >> > > coordination among different organization is indeed a concern.
> > > >> > >
> > > >> > > Can other developers also provide feedback regarding your
> > preference
> > > >> > > between the two?
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Dong
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <
> jacob.m...@gmail.com
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > Hey Dong,
> > > >> > > >
> > > >> > > > I appreciate your thoughtful responses. Let's do one more
> round
> > > :-)
> > > >> > > >
> > > >> > > >
> > > >> > > > > Here are my current concern with the three alternatives you
> > > >> described
> > > >> > > > > earlier:
> > > >> > > > > - The first alternative requires support from input system
> > which
> > > >> is
> > > >> > > > > currently not available. It will limit the usage of
> partition
> > > >> > expansion
> > > >> > > > to
> > > >> > > > > only systems that support such interface. And it is not
> > > guaranteed
> > > >> > that
> > > >> > > > we
> > > >> > > > > can persuade the developer of the input system to add this
> > > >> interface.
> > > >> > > > This
> > > >> > > > > is not desirable for Samza in the long term.
> > > >> > > >
> > > >> > > > Agreed. It is very wishful thinking that each supported system
> > > would
> > > >> > > > provide such a contract.
> > > >> > > >
> > > >> > > >
> > > >> > > > >
> > > >> > > > > - I can not comment on the second alternative because I
> don't
> > > >> > > understand
> > > >> > > > > how it reshuffles all existing changelog data. We can
> discuss
> > > >> more if
> > > >> > > > there
> > > >> > > > > is more specific detail. My gut feel is that this will be
> > > complex
> > > >> and
> > > >> > > > > carries performance overhead.
> > > >> > > >
> > > >> > > > After giving this more thought, I agree. There is no clear way
> > to
> > > >> > > migrate a
> > > >> > > > changelog without knowing the original key->partition mapping.
> > > Which
> > > >> > > leads
> > > >> > > > us to alternative 3...
> > > >> > > >
> > > >> > > >
> > > >> > > > >
> > > >> > > > > - The third alternative requires performance overhead. Given
> > > that
> > > >> > user
> > > >> > > > can
> > > >> > > > > already use this solution to enable partition expansion,
> maybe
> > > >> Samza
> > > >> > > > > developers can provide more input as to why we are not doing
> > it
> > > by
> > > >> > > > default.
> > > >> > > > > My gut feel is that it carries considerable performance
> > overhead
> > > >> and
> > > >> > > > > increases the cost-to-serve Samze job (e.g. disk usage),
> which
> > > may
> > > >> > make
> > > >> > > > it
> > > >> > > > > undesirable in the long term.
> > > >> > > >
> > > >> > > > I think the only performance overhead would be the mandatory
> > > >> > > repartitioning
> > > >> > > > stage for stateful jobs. But a repartitioner is usually much
> > > faster
> > > >> > than
> > > >> > > > the downstream stateful job, so it only seems a cost to serve
> > > issue.
> > > >> > > >
> > > >> > > > As for why we aren't already doing this, I would posit that
> > before
> > > >> the
> > > >> > > > introduction of the high level API, which trivializes
> > > >> repartitioning,
> > > >> > it
> > > >> > > > was unreasonable to expect each job owner to do the mandatory
> > > >> > > > repartitioning. With the high level API, I would argue this is
> > > much
> > > >> > more
> > > >> > > > doable.
> > > >> > > >
> > > >> > > >
> > > >> > > > I am not sure it is true that "any future feature that
> utilizes
> > > this
> > > >> > > > > mapping without accounting for the assumptions of this SEP
> is
> > > >> likely
> > > >> > to
> > > >> > > > > malfunction". Suppose we allow user to specify
> > > >> new-to-old-partition
> > > >> > > > > mapping, then we can use the partition-to-task mapping
> > correctly
> > > >> > > without
> > > >> > > > > replying on the assumption in this SEP, right?
> > > >> > > >
> > > >> > > > Right, but my point was that the partition->task mapping is
> not
> > > >> > > sufficient
> > > >> > > > by itself. So adding it by itself is potentially misleading.
> > > >> > > >
> > > >> > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <
> lindon...@gmail.com>
> > > >> wrote:
> > > >> > > >
> > > >> > > > > Thanks for the reply Jacob. Please see my comment inline.
> > > >> > > > >
> > > >> > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <
> > > jacob.m...@gmail.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > >
> > > >> > > > > > >
> > > >> > > > > > > - For users that need partition expansion of the input
> > > streams
> > > >> > for
> > > >> > > > > > stateful
> > > >> > > > > > > job, they have a really big headache in the sense that
> > Samza
> > > >> does
> > > >> > > not
> > > >> > > > > > allow
> > > >> > > > > > > partition expansion for stateful job. SEP-5 addresses
> this
> > > >> > headache
> > > >> > > > for
> > > >> > > > > > > them.
> > > >> > > > > > > You are right that SEP-5 requires user to understand and
> > > >> enforce
> > > >> > > > > > > limitations across organizations. But it is still much
> > > better
> > > >> > than
> > > >> > > > not
> > > >> > > > > > > allowing user to expansion partition for stateful jobs
> at
> > > all,
> > > >> > > right?
> > > >> > > > > > Did I
> > > >> > > > > > > miss something here?
> > > >> > > > > >
> > > >> > > > > > I guess this one is a matter of perspective.
> > > >> > > > > >
> > > >> > > > > > One argument is that if the system supports one case, it's
> > > >> better
> > > >> > > than
> > > >> > > > > none
> > > >> > > > > > because there is one less scenario in which the system
> does
> > > the
> > > >> > wrong
> > > >> > > > > > thing.
> > > >> > > > > >
> > > >> > > > > > The counter argument is for uniform and consistent
> behavior,
> > > >> which
> > > >> > is
> > > >> > > > > easy
> > > >> > > > > > for users to understand and properly leverage.
> > > >> > > > > >
> > > >> > > > > > Specifically, I'd argue that the current rule is very
> > simple:
> > > >> "you
> > > >> > > > cannot
> > > >> > > > > > repartition inputs on a stateful job, so you must
> > > over-partition
> > > >> > the
> > > >> > > > > > initial implementation". To me, while that rule is not
> > ideal,
> > > >> its
> > > >> > > > > > simplicity is better that introducing a new solution that
> > has
> > > a
> > > >> > bunch
> > > >> > > > of
> > > >> > > > > > caveats, any one of which could be missed. If any one of
> the
> > > >> > > > assumptions
> > > >> > > > > in
> > > >> > > > > > this SEP design are violated, the job would behave
> > > incorrectly.
> > > >> > That
> > > >> > > > > puts a
> > > >> > > > > > lot more burden on the users than the simpler rule.
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > I agree that we have different perspective here. It is true
> > that
> > > >> user
> > > >> > > > would
> > > >> > > > > mess up their job if they used this feature in a wrong way,
> > i.e.
> > > >> > > violate
> > > >> > > > > the assumption made in SEP-5. On the other hand, I think
> there
> > > is
> > > >> > > always
> > > >> > > > a
> > > >> > > > > way for user to mess up their job if they configure the
> Samza
> > > job
> > > >> > > > > incorrectly. I also think the assumption made in this SEP is
> > not
> > > >> > > > > particularly harder to understand than other existing
> configs
> > in
> > > >> > Samza.
> > > >> > > > >
> > > >> > > > > The answer to this can be subjective. I would love to hear
> > > >> > perspective
> > > >> > > > from
> > > >> > > > > other developers on this issue.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > >
> > > >> > > > > > That's why I mentioned a few alternatives that, while more
> > > >> complex
> > > >> > to
> > > >> > > > > > implement, would provide a more consistent behavior with
> > > simple
> > > >> > rules
> > > >> > > > for
> > > >> > > > > > the users.
> > > >> > > > > >
> > > >> > > > >
> > > >> > > > > I am open to discuss alternative solutions that can address
> > the
> > > >> the
> > > >> > > > problem
> > > >> > > > > in a better manner. I am not opposed to complexity as long
> as
> > it
> > > >> > gives
> > > >> > > us
> > > >> > > > > good long term benefits.
> > > >> > > > >
> > > >> > > > > Here are my current concern with the three alternatives you
> > > >> described
> > > >> > > > > earlier:
> > > >> > > > >
> > > >> > > > > - The first alternative requires support from input system
> > which
> > > >> is
> > > >> > > > > currently not available. It will limit the usage of
> partition
> > > >> > expansion
> > > >> > > > to
> > > >> > > > > only systems that support such interface. And it is not
> > > guaranteed
> > > >> > that
> > > >> > > > we
> > > >> > > > > can persuade the developer of the input system to add this
> > > >> interface.
> > > >> > > > This
> > > >> > > > > is not desirable for Samza in the long term.
> > > >> > > > >
> > > >> > > > > - I can not comment on the second alternative because I
> don't
> > > >> > > understand
> > > >> > > > > how it reshuffles all existing changelog data. We can
> discuss
> > > >> more if
> > > >> > > > there
> > > >> > > > > is more specific detail. My gut feel is that this will be
> > > complex
> > > >> and
> > > >> > > > > carries performance overhead.
> > > >> > > > >
> > > >> > > > > - The third alternative requires performance overhead. Given
> > > that
> > > >> > user
> > > >> > > > can
> > > >> > > > > already use this solution to enable partition expansion,
> maybe
> > > >> Samza
> > > >> > > > > developers can provide more input as to why we are not doing
> > it
> > > by
> > > >> > > > default.
> > > >> > > > > My gut feel is that it carries considerable performance
> > overhead
> > > >> and
> > > >> > > > > increases the cost-to-serve Samze job (e.g. disk usage),
> which
> > > may
> > > >> > make
> > > >> > > > it
> > > >> > > > > undesirable in the long term.
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > >
> > > >> > > > > > Yes, we need a similar check for
> > > GroupBySystemStreamPartitionWi
> > > >> > > > > > > thFixedTaskNum
> > > >> > > > > > > as well. If there is more grouper classes needed in the
> > > >> future,
> > > >> > we
> > > >> > > > can
> > > >> > > > > > > solve this problem cleanly without new config. Given the
> > > >> > > > > > > previousGrouperClass and newGrouperClass,
> > > >> KafkaCheckpointLogKey
> > > >> > > will
> > > >> > > > > > throw
> > > >> > > > > > > exception if and only if newGrouperClass is an instance
> of
> > > >> > > > > > > previousGrouperClass.
> > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should
> > extend
> > > >> > > > > > > GroupBySystemStreamPartition
> > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should extend
> > > >> > > GroupByPartition.
> > > >> > > > > > Does
> > > >> > > > > > > this address your concern?
> > > >> > > > > >
> > > >> > > > > > Sounds workable, thanks.
> > > >> > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Can
> > > >> > > > > > > you be more specific why Partition-to-task mapping is
> not
> > > >> > > meaningful
> > > >> > > > > > > without
> > > >> > > > > > > some definition of the key-to-partition assignments and
> > why
> > > >> it is
> > > >> > > > > > > incomplete and misleading?
> > > >> > > > > >
> > > >> > > > > >  A partition is (in my naive interpretation) an
> independent
> > > >> queue
> > > >> > for
> > > >> > > > > > messages of a particular key set. It is not the *identity*
> > of
> > > >> the
> > > >> > > > > partition
> > > >> > > > > > that determine the contents of the associated task's local
> > > >> state.
> > > >> > > > Rather
> > > >> > > > > it
> > > >> > > > > > is the *contents* of the partition that affect the task's
> > > >> state. A
> > > >> > > > > > partiton-to-task mapping only captures an identity
> > > relationship:
> > > >> > > > > > partition1->task1. Without the assumptions of this SEP,
> this
> > > is
> > > >> > > > > > insufficient to determine the assignment of keys to tasks,
> > > >> which is
> > > >> > > > what
> > > >> > > > > > really matters. Therefore, any future feature that
> utilizes
> > > this
> > > >> > > > mapping
> > > >> > > > > > without accounting for the assumptions of this SEP is
> likely
> > > to
> > > >> > > > > > malfunction.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > I am not sure it is true that "any future feature that
> > utilizes
> > > >> this
> > > >> > > > > mapping without accounting for the assumptions of this SEP
> is
> > > >> likely
> > > >> > to
> > > >> > > > > malfunction". Suppose we allow user to specify
> > > >> new-to-old-partition
> > > >> > > > > mapping, then we can use the partition-to-task mapping
> > correctly
> > > >> > > without
> > > >> > > > > replying on the assumption in this SEP, right?
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > >
> > > >> > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <
> > > lindon...@gmail.com>
> > > >> > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hey Jacob,
> > > >> > > > > > >
> > > >> > > > > > > Thanks for the explanation. It seems that your biggest
> > > >> concern is
> > > >> > > > with
> > > >> > > > > > the
> > > >> > > > > > > generality of the proposal. Let me try to address this
> and
> > > >> other
> > > >> > > > > comments
> > > >> > > > > > > below.
> > > >> > > > > > >
> > > >> > > > > > > 1) ... it will cause headaches for Samza users ...
> > > >> > > > > > >
> > > >> > > > > > > I am not sure I understand why this proposal causes
> > headache
> > > >> for
> > > >> > > > Samza
> > > >> > > > > > > users. Here is the impact of the SEP-5 on users:
> > > >> > > > > > >
> > > >> > > > > > > - For users that do not need partition expansion of the
> > > input
> > > >> > > stream,
> > > >> > > > > > they
> > > >> > > > > > > can use Samza without change change in
> code/binary/config.
> > > >> Thus
> > > >> > > there
> > > >> > > > > is
> > > >> > > > > > no
> > > >> > > > > > > headache for them.
> > > >> > > > > > >
> > > >> > > > > > > - For users that need partition expansion of the input
> > > streams
> > > >> > for
> > > >> > > > > > > stateless job, they currently need to manually reboot
> > their
> > > >> Samza
> > > >> > > job
> > > >> > > > > in
> > > >> > > > > > > order to let Samza consume the new partitions created
> for
> > > the
> > > >> > > stream.
> > > >> > > > > > SEP-5
> > > >> > > > > > > actually reduced their headache by allowing Samza to
> > > >> > automatically
> > > >> > > > > detect
> > > >> > > > > > > and consume new partitions.
> > > >> > > > > > >
> > > >> > > > > > > - For users that need partition expansion of the input
> > > streams
> > > >> > for
> > > >> > > > > > stateful
> > > >> > > > > > > job, they have a really big headache in the sense that
> > Samza
> > > >> does
> > > >> > > not
> > > >> > > > > > allow
> > > >> > > > > > > partition expansion for stateful job. SEP-5 addresses
> this
> > > >> > headache
> > > >> > > > for
> > > >> > > > > > > them.
> > > >> > > > > > >
> > > >> > > > > > > You are right that SEP-5 requires user to understand and
> > > >> enforce
> > > >> > > > > > > limitations across organizations. But it is still much
> > > better
> > > >> > than
> > > >> > > > not
> > > >> > > > > > > allowing user to expansion partition for stateful jobs
> at
> > > all,
> > > >> > > right?
> > > >> > > > > > Did I
> > > >> > > > > > > miss something here?
> > > >> > > > > > >
> > > >> > > > > > > 2) ... Separate orgs are often difficult to coordinate
> > and a
> > > >> > system
> > > >> > > > > which
> > > >> > > > > > > depends on such significant process/coordination is too
> > > >> fragile
> > > >> > for
> > > >> > > > my
> > > >> > > > > > > taste ..
> > > >> > > > > > >
> > > >> > > > > > > This is true. Ideally we want a system that is fully
> > > >> > self-serving.
> > > >> > > I
> > > >> > > > > > think
> > > >> > > > > > > this is a long term goal for Samza. Still, for the
> reasons
> > > >> > > described
> > > >> > > > > > above,
> > > >> > > > > > > I think something is better than nothing. I am open to
> > > >> > alternative
> > > >> > > > > design
> > > >> > > > > > > that can support partition expansion for stateful jobs
> > > without
> > > >> > > > > requiring
> > > >> > > > > > > coordination.
> > > >> > > > > > >
> > > >> > > > > > > 3) There is currently no supported way of sharing state
> > > among
> > > >> the
> > > >> > > > tasks
> > > >> > > > > > of
> > > >> > > > > > > a container.  Each task has its own isolated store and
> > that
> > > >> > logical
> > > >> > > > > > > isolation is the primary thing that enables Samza jobs
> to
> > > >> scale
> > > >> > > with
> > > >> > > > a
> > > >> > > > > > > simple container count change. My feeling is that we
> > should
> > > >> not
> > > >> > > > change
> > > >> > > > > > > this without
> > > >> > > > > > > good reason.
> > > >> > > > > > >
> > > >> > > > > > > I see your point. I will remove this sentence from the
> > > >> motivation
> > > >> > > > > > section.
> > > >> > > > > > > This won't have any impact on the design of the SEP-5.
> > Does
> > > >> this
> > > >> > > > > address
> > > >> > > > > > > the problem?
> > > >> > > > > > >
> > > >> > > > > > > 4) With the current proposal, we'd also need a similar
> > check
> > > >> for
> > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well.
> And
> > > if
> > > >> any
> > > >> > > > other
> > > >> > > > > > > groupers
> > > >> > > > > > > were later added with both these modes, we'd probably
> need
> > > to
> > > >> add
> > > >> > > > those
> > > >> > > > > > > too. It might be easier and cleaner to add a config to
> > > ignore
> > > >> > that
> > > >> > > > > check
> > > >> > > > > > > temporarily. Down side is that it further complicates
> the
> > > >> Samza
> > > >> > > > config,
> > > >> > > > > > > which is already huge. Thoughts?
> > > >> > > > > > >
> > > >> > > > > > > Yes, we need a similar check for
> > > >> GroupBySystemStreamPartitionWi
> > > >> > > > > > > thFixedTaskNum
> > > >> > > > > > > as well. If there is more grouper classes needed in the
> > > >> future,
> > > >> > we
> > > >> > > > can
> > > >> > > > > > > solve this problem cleanly without new config. Given the
> > > >> > > > > > > previousGrouperClass and newGrouperClass,
> > > >> KafkaCheckpointLogKey
> > > >> > > will
> > > >> > > > > > throw
> > > >> > > > > > > exception if and only if newGrouperClass is an instance
> of
> > > >> > > > > > > previousGrouperClass.
> > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should
> > extend
> > > >> > > > > > > GroupBySystemStreamPartition
> > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should extend
> > > >> > > GroupByPartition.
> > > >> > > > > > Does
> > > >> > > > > > > this address your concern?
> > > >> > > > > > >
> > > >> > > > > > > 5) The task-to-container and container-to-host mappings
> > are
> > > >> both
> > > >> > > > > > meaningful
> > > >> > > > > > > in context of the JobModel. Partition-to-task mapping is
> > not
> > > >> > > > meaningful
> > > >> > > > > > > without
> > > >> > > > > > > some definition of the key-to-partition assignments.
> It's
> > > >> > > incomplete
> > > >> > > > > > > information and therefore misleading. I think it only
> > makes
> > > >> sense
> > > >> > > to
> > > >> > > > > use
> > > >> > > > > > > this mapping if we adopt a solution wherein Samza also
> > knows
> > > >> the
> > > >> > > > > > partition
> > > >> > > > > > > key assignment.
> > > >> > > > > > >
> > > >> > > > > > > Partition-to-task is currently explicitly passed from
> job
> > > >> > > coordinator
> > > >> > > > > to
> > > >> > > > > > > each task as part of the job model to tell tasks which
> > > >> partitions
> > > >> > > to
> > > >> > > > > > > consume from. I think we can store some definition of
> the
> > > >> > > > > > key-to-partition
> > > >> > > > > > > assignments if Samza decides to get and use this
> > information
> > > >> in
> > > >> > the
> > > >> > > > > > > future. Can
> > > >> > > > > > > you be more specific why Partition-to-task mapping is
> not
> > > >> > > meaningful
> > > >> > > > > > > without
> > > >> > > > > > > some definition of the key-to-partition assignments and
> > why
> > > >> it is
> > > >> > > > > > > incomplete and misleading?
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Dong
> > > >> > > > > > >
> > > >> > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <
> > > >> > jacob.m...@gmail.com>
> > > >> > > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Hey Dong,
> > > >> > > > > > > >
> > > >> > > > > > > > I'm opposed (or a +0, at best) to this limited,
> > > >> Kafka-specific
> > > >> > > > > > solution.
> > > >> > > > > > > I
> > > >> > > > > > > > understand that the proposal is relatively simple to
> > > >> implement,
> > > >> > > > but I
> > > >> > > > > > > think
> > > >> > > > > > > > it will cause headaches for Samza users. They will not
> > > only
> > > >> > have
> > > >> > > to
> > > >> > > > > > > > understand all the limitations (increase only, double
> > > >> > partitions
> > > >> > > > > only,
> > > >> > > > > > > > partition using hash+modulo, etc) of this approach,
> but
> > > >> > enforcing
> > > >> > > > > these
> > > >> > > > > > > > limitations can be a major problem, especially when
> the
> > > >> Samza
> > > >> > > jobs
> > > >> > > > > and
> > > >> > > > > > > > message brokers are managed by separate orgs in a
> > company.
> > > >> > > Separate
> > > >> > > > > > orgs
> > > >> > > > > > > > are often difficult to coordinate and a system which
> > > >> depends on
> > > >> > > > such
> > > >> > > > > > > > significant process/coordination is too fragile for my
> > > >> taste.
> > > >> > > > > > > >
> > > >> > > > > > > > That said, I realize that my opinion is just one of
> many
> > > in
> > > >> the
> > > >> > > > > broader
> > > >> > > > > > > > community which may feel differently, so let me
> respond
> > to
> > > >> some
> > > >> > > of
> > > >> > > > > the
> > > >> > > > > > > > other items in the discussion so we can clear them up:
> > > >> > > > > > > >
> > > >> > > > > > > > The task-to-container assignment matters because if
> the
> > > >> > > correlated
> > > >> > > > > > tasks
> > > >> > > > > > > > > (i.e. tasks that consume messages with the same key)
> > > >> needs to
> > > >> > > be
> > > >> > > > in
> > > >> > > > > > the
> > > >> > > > > > > > > same container so that they can share the same
> > key/value
> > > >> > local
> > > >> > > > > store
> > > >> > > > > > on
> > > >> > > > > > > > the
> > > >> > > > > > > > > same physical machine.
> > > >> > > > > > > >
> > > >> > > > > > > > There is currently no supported way of sharing state
> > among
> > > >> the
> > > >> > > > tasks
> > > >> > > > > > of a
> > > >> > > > > > > > container.  Each task has its own isolated store and
> > that
> > > >> > logical
> > > >> > > > > > > isolation
> > > >> > > > > > > > is the primary thing that enables Samza jobs to scale
> > > with a
> > > >> > > simple
> > > >> > > > > > > > container count change. My feeling is that we should
> not
> > > >> change
> > > >> > > > this
> > > >> > > > > > > > without good reason.
> > > >> > > > > > > >
> > > >> > > > > > > > I think we can hardcode new logic in
> > > >> > KafkaCheckpointLogKey.scala
> > > >> > > > such
> > > >> > > > > > > that
> > > >> > > > > > > > > exception will not be thrown if new grouper is
> > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is
> > > >> > > > > > GroupByPartition.
> > > >> > > > > > > > Does
> > > >> > > > > > > > > this look reasonable?
> > > >> > > > > > > >
> > > >> > > > > > > > With the current proposal, we'd also need a similar
> > check
> > > >> for
> > > >> > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well.
> > And
> > > >> if
> > > >> > any
> > > >> > > > > other
> > > >> > > > > > > > groupers were later added with both these modes, we'd
> > > >> probably
> > > >> > > need
> > > >> > > > > to
> > > >> > > > > > > add
> > > >> > > > > > > > those too. It might be easier and cleaner to add a
> > config
> > > to
> > > >> > > ignore
> > > >> > > > > > that
> > > >> > > > > > > > check temporarily. Down side is that it further
> > > complicates
> > > >> the
> > > >> > > > Samza
> > > >> > > > > > > > config, which is already huge. Thoughts?
> > > >> > > > > > > >
> > > >> > > > > > > > I think storing the previous task-to-partition mapping
> > is
> > > >> more
> > > >> > > > > general
> > > >> > > > > > > than
> > > >> > > > > > > > > storing the partition count of all topics for the
> > > >> following
> > > >> > > > > reasons:
> > > >> > > > > > > > > - Samza already stores the task-to-container mapping
> > and
> > > >> > > > > > > > container-to-host
> > > >> > > > > > > > > mapping in the coordinator stream. It seems
> consistent
> > > to
> > > >> > also
> > > >> > > > > store
> > > >> > > > > > > the
> > > >> > > > > > > > > partition-to-task mapping. And this information may
> be
> > > >> useful
> > > >> > > for
> > > >> > > > > > other
> > > >> > > > > > > > > use-case such as debugging.
> > > >> > > > > > > > > - By having the new interface take the previous
> > > >> > > task-to-partition
> > > >> > > > > > > > > assignment instead of a topic-to-partition-count
> > mapping
> > > >> as
> > > >> > new
> > > >> > > > > > > > parameter,
> > > >> > > > > > > > > we can potentially have grouper implementation to
> > > support
> > > >> > other
> > > >> > > > > types
> > > >> > > > > > > of
> > > >> > > > > > > > > input systems.
> > > >> > > > > > > > > - It is sightly simpler to store the
> task-to-partition
> > > >> > > assignment
> > > >> > > > > > > because
> > > >> > > > > > > > > we don't need to know whether this is the first
> time a
> > > >> job is
> > > >> > > > > started
> > > >> > > > > > > or
> > > >> > > > > > > > > not. On the other hand, you can write
> > > >> > topic-to-partition-count
> > > >> > > > > > mapping
> > > >> > > > > > > to
> > > >> > > > > > > > > the coordinator stream only if this is the first
> time
> > > the
> > > >> job
> > > >> > > is
> > > >> > > > > run
> > > >> > > > > > > >
> > > >> > > > > > > > The task-to-container and container-to-host mappings
> are
> > > >> both
> > > >> > > > > > meaningful
> > > >> > > > > > > in
> > > >> > > > > > > > context of the JobModel. Partition-to-task mapping is
> > not
> > > >> > > > meaningful
> > > >> > > > > > > > without some definition of the key-to-partition
> > > assignments.
> > > >> > It's
> > > >> > > > > > > > incomplete information and therefore misleading. I
> think
> > > it
> > > >> > only
> > > >> > > > > makes
> > > >> > > > > > > > sense to use this mapping if we adopt a solution
> wherein
> > > >> Samza
> > > >> > > also
> > > >> > > > > > knows
> > > >> > > > > > > > the partition key assignment.
> > > >> > > > > > > >
> > > >> > > > > > > > -Jake
> > > >> > > > > > > >
> > > >> > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <
> > > >> lindon...@gmail.com
> > > >> > >
> > > >> > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hey Jacob,
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks for taking time to review the SEP.
> > > >> > > > > > > > >
> > > >> > > > > > > > > I agree with you and Navina that the current SEP
> > doesn't
> > > >> > > provide
> > > >> > > > > > > support
> > > >> > > > > > > > to
> > > >> > > > > > > > > arbitrary input systems and it doesn't support
> > partition
> > > >> > > shrink.
> > > >> > > > I
> > > >> > > > > > > think
> > > >> > > > > > > > > the scope of this SEP is to support partition
> > expansion
> > > >> for
> > > >> > > Kafka
> > > >> > > > > > (the
> > > >> > > > > > > > most
> > > >> > > > > > > > > widely used input system of Samza) and keep the door
> > > open
> > > >> for
> > > >> > > > > > partition
> > > >> > > > > > > > > support of various input systems. The current design
> > can
> > > >> > > support
> > > >> > > > > any
> > > >> > > > > > > > system
> > > >> > > > > > > > > that meets the two operational requirement specified
> > in
> > > >> the
> > > >> > > doc.
> > > >> > > > > > > > >
> > > >> > > > > > > > > While it is possible to support more types of input
> > > >> systems,
> > > >> > it
> > > >> > > > > will
> > > >> > > > > > > > likely
> > > >> > > > > > > > > add more complexity to the design. For example, the
> > > first
> > > >> > > > > alternative
> > > >> > > > > > > > > solution from you requires broker-side support to
> > > >> negotiate
> > > >> > > hash
> > > >> > > > > > > > algorithm.
> > > >> > > > > > > > > The second alternative solution requires changelog
> > > >> partition
> > > >> > > > > > reshuffle
> > > >> > > > > > > > > which carries its own design complexity and
> > performance
> > > >> > > overhead.
> > > >> > > > > > There
> > > >> > > > > > > > is
> > > >> > > > > > > > > tradeoff between the generality and the complexity
> > among
> > > >> > these
> > > >> > > > > > > choices. I
> > > >> > > > > > > > > like the current design because it is simple and
> > > >> addresses a
> > > >> > > big
> > > >> > > > > > usage
> > > >> > > > > > > > > scenario for us. We can add more complexity to
> > > generalize
> > > >> the
> > > >> > > > > design
> > > >> > > > > > if
> > > >> > > > > > > > it
> > > >> > > > > > > > > enables important use-case. Does this sound
> > reasonable?
> > > >> > > > > > > > >
> > > >> > > > > > > > > Note that the "Rejected Alternative" section also
> > > mentions
> > > >> > the
> > > >> > > > > > > > possibility
> > > >> > > > > > > > > of supporting a wider range of input systems by
> > allowing
> > > >> user
> > > >> > > to
> > > >> > > > > > > specify
> > > >> > > > > > > > > the new-partition to old-partition mapping. We are
> not
> > > >> doing
> > > >> > it
> > > >> > > > > > because
> > > >> > > > > > > > 1)
> > > >> > > > > > > > > we may have better understanding of the design after
> > we
> > > >> have
> > > >> > a
> > > >> > > > > > specific
> > > >> > > > > > > > > second input system to support 2) the current design
> > can
> > > >> be
> > > >> > > > > extended
> > > >> > > > > > to
> > > >> > > > > > > > > support general input systems. I think similar
> > argument
> > > >> can
> > > >> > be
> > > >> > > > > > applied
> > > >> > > > > > > > > explain why we don't have to support general input
> > > systems
> > > >> > > using
> > > >> > > > > the
> > > >> > > > > > > > > potentially-good alternatives you mentioned.
> > > >> > > > > > > > >
> > > >> > > > > > > > > I hope SEP-5 can be an important first-step towards
> > > >> > supporting
> > > >> > > > > > > partition
> > > >> > > > > > > > > expansion of any input system.
> > > >> > > > > > > > >
> > > >> > > > > > > > > To answer your questions about the current proposal:
> > > >> > > > > > > > >
> > > >> > > > > > > > > >1. "An alternative solution is to allow task number
> > to
> > > >> > > increase
> > > >> > > > > > after
> > > >> > > > > > > > > >partition expansion and uses a proper
> > task-to-container
> > > >> > > > assignment
> > > >> > > > > > to
> > > >> > > > > > > > make
> > > >> > > > > > > > > >sure the Samza output is correct." What does the
> > > >> container
> > > >> > > have
> > > >> > > > to
> > > >> > > > > > do
> > > >> > > > > > > > with
> > > >> > > > > > > > > >stateful processing or output in general?
> > > >> > > > > > > > >
> > > >> > > > > > > > > The task-to-container assignment matters because if
> > the
> > > >> > > > correlated
> > > >> > > > > > > tasks
> > > >> > > > > > > > > (i.e. tasks that consume messages with the same key)
> > > >> needs to
> > > >> > > be
> > > >> > > > in
> > > >> > > > > > the
> > > >> > > > > > > > > same container so that they can share the same
> > key/value
> > > >> > local
> > > >> > > > > store
> > > >> > > > > > on
> > > >> > > > > > > > the
> > > >> > > > > > > > > same physical machine.
> > > >> > > > > > > > >
> > > >> > > > > > > > > >2. When you use "Join" as an example, you basically
> > > mean
> > > >> > > > multiple
> > > >> > > > > > > > > >co-partitioned streams, right? This is opposed to
> > > >> multiple,
> > > >> > > > > > > > > >independently-partitioned streams or a single
> stream.
> > > >> Would
> > > >> > be
> > > >> > > > > nice
> > > >> > > > > > to
> > > >> > > > > > > > > >formulate the proposal in these more general terms.
> > > >> > > > > > > > >
> > > >> > > > > > > > > I thought "join" is a commonly used to refer to the
> > join
> > > >> > > > opeartion
> > > >> > > > > > with
> > > >> > > > > > > > > co-partitioned stream but I may be wrong. I have
> > updated
> > > >> the
> > > >> > > wiki
> > > >> > > > > to
> > > >> > > > > > > > > explicitly mention "co-partitioned stream". Does
> this
> > > look
> > > >> > > better
> > > >> > > > > > now?
> > > >> > > > > > > > >
> > > >> > > > > > > > > >3. When switching SSP groupers, how will the users
> > > avoid
> > > >> the
> > > >> > > > > > > > > >org.apache.samza.checkpoint.kafka.
> > > >> > > > DifferingSystemStreamPartition
> > > >> > > > > > > > > GrouperFactoryValues
> > > >> > > > > > > > > >exception?
> > > >> > > > > > > > >
> > > >> > > > > > > > > I think we can hardcode new logic in
> > > >> > > KafkaCheckpointLogKey.scala
> > > >> > > > > such
> > > >> > > > > > > > that
> > > >> > > > > > > > > exception will not be thrown if new grouper is
> > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is
> > > >> > > > > > GroupByPartition.
> > > >> > > > > > > > Does
> > > >> > > > > > > > > this look reasonable?
> > > >> > > > > > > > >
> > > >> > > > > > > > > >4. Partition to task assignment is meaningless
> > without
> > > >> key
> > > >> > to
> > > >> > > > > > > partition
> > > >> > > > > > > > > >mapping. The real semantics are captured in the
> > > external
> > > >> > > > > requirement
> > > >> > > > > > > for
> > > >> > > > > > > > > >partitioning via hash+modulo. But in that case,
> iiuc,
> > > >> only
> > > >> > the
> > > >> > > > > > > partition
> > > >> > > > > > > > > >count matters. So why not just store the original
> > > >> partition
> > > >> > > > count
> > > >> > > > > > > rather
> > > >> > > > > > > > > >than the whole mapping?
> > > >> > > > > > > > >
> > > >> > > > > > > > > I think storing the previous task-to-partition
> mapping
> > > is
> > > >> > more
> > > >> > > > > > general
> > > >> > > > > > > > than
> > > >> > > > > > > > > storing the partition count of all topics for the
> > > >> following
> > > >> > > > > reasons:
> > > >> > > > > > > > >
> > > >> > > > > > > > > - Samza already stores the task-to-container mapping
> > and
> > > >> > > > > > > > container-to-host
> > > >> > > > > > > > > mapping in the coordinator stream. It seems
> consistent
> > > to
> > > >> > also
> > > >> > > > > store
> > > >> > > > > > > the
> > > >> > > > > > > > > partition-to-task mapping. And this information may
> be
> > > >> useful
> > > >> > > for
> > > >> > > > > > other
> > > >> > > > > > > > > use-case such as debugging.
> > > >> > > > > > > > >
> > > >> > > > > > > > > - By having the new interface take the previous
> > > >> > > task-to-partition
> > > >> > > > > > > > > assignment instead of a topic-to-partition-count
> > mapping
> > > >> as
> > > >> > new
> > > >> > > > > > > > parameter,
> > > >> > > > > > > > > we can potentially have grouper implementation to
> > > support
> > > >> > other
> > > >> > > > > types
> > > >> > > > > > > of
> > > >> > > > > > > > > input systems.
> > > >> > > > > > > > >
> > > >> > > > > > > > > - It is sightly simpler to store the
> task-to-partition
> > > >> > > assignment
> > > >> > > > > > > because
> > > >> > > > > > > > > we don't need to know whether this is the first
> time a
> > > >> job is
> > > >> > > > > started
> > > >> > > > > > > or
> > > >> > > > > > > > > not. On the other hand, you can write
> > > >> > topic-to-partition-count
> > > >> > > > > > mapping
> > > >> > > > > > > to
> > > >> > > > > > > > > the coordinator stream only if this is the first
> time
> > > the
> > > >> job
> > > >> > > is
> > > >> > > > > run
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks,
> > > >> > > > > > > > > Dong
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <
> > > >> > > > jacob.m...@gmail.com>
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Hey Dong,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Thanks for the SEP. Supporting partition changes
> is
> > > >> > > critically
> > > >> > > > > > > > important
> > > >> > > > > > > > > > for stateful Samza jobs, so it's great to see some
> > > >> ideas on
> > > >> > > > that
> > > >> > > > > > > front!
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Sorry for the late feedback, but I have a few
> > thoughts
> > > >> to
> > > >> > > > > > contribute.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Big +1 on Navina's comment:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > My biggest gripe with this SEP is that it seems
> > > like a
> > > >> > > > > > tailor-made
> > > >> > > > > > > > > > > solution
> > > >> > > > > > > > > > > that relies on the semantics of the Kafka system
> > and
> > > >> yet,
> > > >> > > we
> > > >> > > > > are
> > > >> > > > > > > > trying
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > masquerade that as operational requirements for
> > > other
> > > >> > > systems
> > > >> > > > > > > > > interacting
> > > >> > > > > > > > > > > with Samza. (Not to say that this is the first
> > time
> > > >> such
> > > >> > a
> > > >> > > > > choice
> > > >> > > > > > > is
> > > >> > > > > > > > > > being
> > > >> > > > > > > > > > > made in the Samza design). I am not seeing how
> > this
> > > >> can a
> > > >> > > > > > "general"
> > > >> > > > > > > > > > > solution for all input systems. That's my two
> > > cents. I
> > > >> > > would
> > > >> > > > > like
> > > >> > > > > > > to
> > > >> > > > > > > > > hear
> > > >> > > > > > > > > > > alternative points of view for this from other
> > devs.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Two examples of this:
> > > >> > > > > > > > > > 1. This is mostly a hypothetical, but some message
> > > >> brokers
> > > >> > > may
> > > >> > > > > use
> > > >> > > > > > > key
> > > >> > > > > > > > > > range assignment rather than hash+modulo.
> > > >> > > > > > > > > > 2. Kafka can't reduce the number of partitions,
> but
> > it
> > > >> can
> > > >> > > > happen
> > > >> > > > > > on
> > > >> > > > > > > > > other
> > > >> > > > > > > > > > systems. For example, it may be cheaper to reduce
> > the
> > > >> > number
> > > >> > > of
> > > >> > > > > > > > > partitions
> > > >> > > > > > > > > > on a hosted service where the cost model depends
> on
> > > the
> > > >> > > number
> > > >> > > > of
> > > >> > > > > > > > > > partitions/shards.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > It seems to me that a solution which doesn't
> depend
> > on
> > > >> > > > partition
> > > >> > > > > > key
> > > >> > > > > > > > > > assignment in the message broker. Here are a few
> > > >> > alternatives
> > > >> > > > > that
> > > >> > > > > > > > > weren't
> > > >> > > > > > > > > > discussed and I think should be considered:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Alternatives in order of increasing preference:
> > > >> > > > > > > > > > 1. Samza manages the partition hash (via some new
> > > >> contract
> > > >> > > with
> > > >> > > > > the
> > > >> > > > > > > > > > brokers) and guarantees correct routing of keys
> > among
> > > >> the
> > > >> > new
> > > >> > > > > > > > partitions.
> > > >> > > > > > > > > > 2. Samza detects a task count change, creates a
> new
> > > >> > changelog
> > > >> > > > > with
> > > >> > > > > > > > > correct
> > > >> > > > > > > > > > partitions, and *somehow* reshuffles all existing
> > > >> changelog
> > > >> > > > data
> > > >> > > > > > into
> > > >> > > > > > > > the
> > > >> > > > > > > > > > new topic and then uses the new topic from then
> on.
> > > >> > (doesn't
> > > >> > > > work
> > > >> > > > > > > > without
> > > >> > > > > > > > > > changelog, but in that case durability isn't
> > > paramount,
> > > >> so
> > > >> > we
> > > >> > > > can
> > > >> > > > > > > just
> > > >> > > > > > > > > > wipe)
> > > >> > > > > > > > > > 3. Use RPC in between stages and samza fully
> manages
> > > key
> > > >> > > > > assignment
> > > >> > > > > > > > among
> > > >> > > > > > > > > > tasks. No on-disk topic data to clean up.
> Mandatory
> > > >> > > > > repartitioning
> > > >> > > > > > in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > first stage to pre-scaled tasks in next stage.
> > > >> > > > > > > > > > 4. Combined 2-3 solution
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Finally, some questions about the current
> proposal:
> > > >> > > > > > > > > > 1. "An alternative solution is to allow task
> number
> > to
> > > >> > > increase
> > > >> > > > > > after
> > > >> > > > > > > > > > partition expansion and uses a proper
> > > task-to-container
> > > >> > > > > assignment
> > > >> > > > > > to
> > > >> > > > > > > > > make
> > > >> > > > > > > > > > sure the Samza output is correct." What does the
> > > >> container
> > > >> > > have
> > > >> > > > > to
> > > >> > > > > > do
> > > >> > > > > > > > > with
> > > >> > > > > > > > > > stateful processing or output in general?
> > > >> > > > > > > > > > 2. When you use "Join" as an example, you
> basically
> > > mean
> > > >> > > > multiple
> > > >> > > > > > > > > > co-partitioned streams, right? This is opposed to
> > > >> multiple,
> > > >> > > > > > > > > > independently-partitioned streams or a single
> > stream.
> > > >> Would
> > > >> > > be
> > > >> > > > > nice
> > > >> > > > > > > to
> > > >> > > > > > > > > > formulate the proposal in these more general
> terms.
> > > >> > > > > > > > > > 3. When switching SSP groupers, how will the users
> > > avoid
> > > >> > the
> > > >> > > > > > > > > > org.apache.samza.checkpoint.kafka.
> > > >> > DifferingSystemStreamParti
> > > >> > > > > > > > > > tionGrouperFactoryValues
> > > >> > > > > > > > > > exception?
> > > >> > > > > > > > > > 4. Partition to task assignment is meaningless
> > without
> > > >> key
> > > >> > to
> > > >> > > > > > > partition
> > > >> > > > > > > > > > mapping. The real semantics are captured in the
> > > external
> > > >> > > > > > requirement
> > > >> > > > > > > > for
> > > >> > > > > > > > > > partitioning via hash+modulo. But in that case,
> > iiuc,
> > > >> only
> > > >> > > the
> > > >> > > > > > > > partition
> > > >> > > > > > > > > > count matters. So why not just store the original
> > > >> partition
> > > >> > > > count
> > > >> > > > > > > > rather
> > > >> > > > > > > > > > than the whole mapping?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > -Jake
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <
> > > >> > > lindon...@gmail.com
> > > >> > > > >
> > > >> > > > > > > wrote:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > Hey Yi, Navina,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > I have updated the SEP-5 document based on our
> > > >> > discussion.
> > > >> > > > The
> > > >> > > > > > > > > difference
> > > >> > > > > > > > > > > can be found here
> > > >> > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/
> > > >> > > > > > > diffpagesbyversion.action
> > > >> > > > > > > > ?
> > > >> > > > > > > > > > > pageId=70255476&selectedPageVersions=14&
> > > >> > > selectedPageVersions
> > > >> > > > > > =15>.
> > > >> > > > > > > > > > > Here is the summary of changes:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > - Add new interface that extends the existing
> > > >> interface
> > > >> > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added
> grouper
> > > >> class
> > > >> > > > should
> > > >> > > > > > > > > implement
> > > >> > > > > > > > > > > this interface.
> > > >> > > > > > > > > > > - Explained in the Rejected Alternative Section
> > why
> > > we
> > > >> > > don't
> > > >> > > > > add
> > > >> > > > > > > new
> > > >> > > > > > > > > > method
> > > >> > > > > > > > > > > in the existing interface
> > > >> > > > > > > > > > > - Explained in the Rejected Alternative Section
> > why
> > > we
> > > >> > > don't
> > > >> > > > > > > > > config/class
> > > >> > > > > > > > > > > for user to specify new-partition to
> old-partition
> > > >> > mapping.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Can you take another look at the proposal and
> let
> > me
> > > >> know
> > > >> > > if
> > > >> > > > > > there
> > > >> > > > > > > is
> > > >> > > > > > > > > any
> > > >> > > > > > > > > > > concern?
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Cheers,
> > > >> > > > > > > > > > > Dong
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <
> > > >> > > > lindon...@gmail.com
> > > >> > > > > >
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > > Hey Yi,
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Thanks much for the comment. I have updated
> the
> > > doc
> > > >> to
> > > >> > > > > address
> > > >> > > > > > > all
> > > >> > > > > > > > > your
> > > >> > > > > > > > > > > > comments except the one related to the
> > interface.
> > > I
> > > >> am
> > > >> > > not
> > > >> > > > > > sure I
> > > >> > > > > > > > > > > > understand your suggestion of the new
> interface.
> > > >> Will
> > > >> > > > discuss
> > > >> > > > > > > > > tomorrow.
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > > Dong
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <
> > > >> > > > nickpa...@gmail.com
> > > >> > > > > >
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >> Hi, Don,
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >> Thanks for the detailed design doc for a
> > > >> long-waited
> > > >> > > > feature
> > > >> > > > > > in
> > > >> > > > > > > > > Samza!
> > > >> > > > > > > > > > > >> Really appreciate it! I did a quick pass and
> > have
> > > >> the
> > > >> > > > > > following
> > > >> > > > > > > > > > > comments:
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >> - minor: "limit the maximum size of
> partition"
> > > ==>
> > > >> > > "limit
> > > >> > > > > the
> > > >> > > > > > > > > maximum
> > > >> > > > > > > > > > > size
> > > >> > > > > > > > > > > >> of each partition"
> > > >> > > > > > > > > > > >> - "However, Samza currently is not able to
> > handle
> > > >> > > > partition
> > > >> > > > > > > > > expansion
> > > >> > > > > > > > > > of
> > > >> > > > > > > > > > > >> the input streams"==>better point out "for
> > > stateful
> > > >> > > jobs".
> > > >> > > > > For
> > > >> > > > > > > > > > stateless
> > > >> > > > > > > > > > > >> jobs, simply bouncing the job now can pick up
> > the
> > > >> new
> > > >> > > > > > > partitions.
> > > >> > > > > > > > > > > >> - "it is possible (e.g. with Kafka) that
> > messages
> > > >> > with a
> > > >> > > > > given
> > > >> > > > > > > key
> > > >> > > > > > > > > > > exists
> > > >> > > > > > > > > > > >> in both partition 1 an 3. Because
> > > GroupByPartition
> > > >> > will
> > > >> > > > > assign
> > > >> > > > > > > > > > > partition 1
> > > >> > > > > > > > > > > >> and 3 to different tasks, messages with the
> > same
> > > >> key
> > > >> > may
> > > >> > > > be
> > > >> > > > > > > > handled
> > > >> > > > > > > > > by
> > > >> > > > > > > > > > > >> different task/container/process and their
> > state
> > > >> will
> > > >> > be
> > > >> > > > > > stored
> > > >> > > > > > > in
> > > >> > > > > > > > > > > >> different changelog partition." The problem
> > > >> statement
> > > >> > is
> > > >> > > > not
> > > >> > > > > > > super
> > > >> > > > > > > > > > clear
> > > >> > > > > > > > > > > >> here. The issues with stateful jobs is: after
> > > >> > > > > GroupByPartition
> > > >> > > > > > > > > assign
> > > >> > > > > > > > > > > >> partition 1 and 3 to different tasks, the new
> > > task
> > > >> > > > handling
> > > >> > > > > > > > > partition
> > > >> > > > > > > > > > 3
> > > >> > > > > > > > > > > >> does not have the previous state to resume
> the
> > > >> work.
> > > >> > > e.g.
> > > >> > > > a
> > > >> > > > > > > > page-key
> > > >> > > > > > > > > > > based
> > > >> > > > > > > > > > > >> counter would start from 0 in the new task
> for
> > a
> > > >> > > specific
> > > >> > > > > key,
> > > >> > > > > > > > > instead
> > > >> > > > > > > > > > > of
> > > >> > > > > > > > > > > >> resuming the previous count 50 held by task
> 1.
> > > >> > > > > > > > > > > >> - minor rewording: "the first solution in
> this
> > > doc"
> > > >> > ==>
> > > >> > > > "the
> > > >> > > > > > > > > solution
> > > >> > > > > > > > > > > >> proposed in this doc"
> > > >> > > > > > > > > > > >> - "Thus additional development work is needed
> > in
> > > >> Kafka
> > > >> > > to
> > > >> > > > > meet
> > > >> > > > > > > > this
> > > >> > > > > > > > > > > >> requirement" It would be good to link to a
> KIP
> > if
> > > >> and
> > > >> > > when
> > > >> > > > > it
> > > >> > > > > > > > exists
> > > >> > > > > > > > > > > >> - Instead of touching/deprecating the
> interface
> > > >> > > > > > > > > > > >> SystemStreamPartitionGrouper, I would
> recommend
> > > to
> > > >> > have
> > > >> > > a
> > > >> > > > > > > > different
> > > >> > > > > > > > > > > >> implementation class of the interface, which
> in
> > > the
> > > >> > > > > > constructor
> > > >> > > > > > > of
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > >> grouper, takes two parameters: a) the
> previous
> > > task
> > > >> > > number
> > > >> > > > > > read
> > > >> > > > > > > > from
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > >> coordinator stream; b) the configured
> > > >> new-partition to
> > > >> > > > > > > > old-partition
> > > >> > > > > > > > > > > >> mapping policy. Then, the grouper's interface
> > > >> method
> > > >> > > stays
> > > >> > > > > the
> > > >> > > > > > > > same
> > > >> > > > > > > > > > and
> > > >> > > > > > > > > > > >> the
> > > >> > > > > > > > > > > >> behavior of the grouper is more configurable
> > > which
> > > >> is
> > > >> > > good
> > > >> > > > > to
> > > >> > > > > > > > > support
> > > >> > > > > > > > > > a
> > > >> > > > > > > > > > > >> broader set of use cases in addition to
> Kafka's
> > > >> > built-in
> > > >> > > > > > > partition
> > > >> > > > > > > > > > > >> expansion policies.
> > > >> > > > > > > > > > > >> - Minor renaming suggestion to the new
> grouper
> > > >> class
> > > >> > > > names:
> > > >> > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum
> > > >> > > > > > > > > > > >> and GroupBySystemStreamPartitionWi
> > thFixedTaskNum
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >> Thanks!
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >> - Yi
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <
> > > >> > > > > > lindon...@gmail.com
> > > >> > > > > > > >
> > > >> > > > > > > > > > wrote:
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >> > Hey Navina,
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > Thanks much for the comment. Please see my
> > > >> response
> > > >> > > > below.
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > Regarding your biggest gripe with the SEP,
> I
> > > >> > > personally
> > > >> > > > > > think
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > >> > operational requirement proposed in the KIP
> > are
> > > >> > pretty
> > > >> > > > > > general
> > > >> > > > > > > > and
> > > >> > > > > > > > > > > >> could be
> > > >> > > > > > > > > > > >> > easily enforced by other systems. The
> reason
> > is
> > > >> that
> > > >> > > the
> > > >> > > > > > > module
> > > >> > > > > > > > > > > >> operation
> > > >> > > > > > > > > > > >> > is pretty standard and the default option
> > when
> > > we
> > > >> > > choose
> > > >> > > > > > > > > partition.
> > > >> > > > > > > > > > > And
> > > >> > > > > > > > > > > >> > usually the underlying system allows user
> to
> > > >> select
> > > >> > > > > > arbitrary
> > > >> > > > > > > > > > > partition
> > > >> > > > > > > > > > > >> > number if it supports partition expansion.
> Do
> > > you
> > > >> > know
> > > >> > > > any
> > > >> > > > > > > > system
> > > >> > > > > > > > > > that
> > > >> > > > > > > > > > > >> does
> > > >> > > > > > > > > > > >> > not meet these two requirement?
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > Regarding your comment of the Motivation
> > > >> section, I
> > > >> > > > > renamed
> > > >> > > > > > > the
> > > >> > > > > > > > > > first
> > > >> > > > > > > > > > > >> > section as "Problem and Goal" and specified
> > > that
> > > >> > "*The
> > > >> > > > > goal
> > > >> > > > > > of
> > > >> > > > > > > > > this
> > > >> > > > > > > > > > > >> > proposal is to enable partition expansion
> of
> > > the
> > > >> > input
> > > >> > > > > > > > > streams*.". I
> > > >> > > > > > > > > > > >> also
> > > >> > > > > > > > > > > >> > put a sentence at the end of the Motivation
> > > >> section
> > > >> > > that
> > > >> > > > > > "*The
> > > >> > > > > > > > > > feature
> > > >> > > > > > > > > > > >> of
> > > >> > > > > > > > > > > >> > task expansion is out of the scope of this
> > > >> proposal
> > > >> > > and
> > > >> > > > > will
> > > >> > > > > > > be
> > > >> > > > > > > > > > > >> addressed
> > > >> > > > > > > > > > > >> > in a future SEP*". The second paragraph in
> > the
> > > >> > > > Motivation
> > > >> > > > > > > > section
> > > >> > > > > > > > > is
> > > >> > > > > > > > > > > >> mainly
> > > >> > > > > > > > > > > >> > used to explain the thinking process that
> we
> > > have
> > > >> > gone
> > > >> > > > > > > through,
> > > >> > > > > > > > > what
> > > >> > > > > > > > > > > >> other
> > > >> > > > > > > > > > > >> > alternative we have considered, and we plan
> > to
> > > >> do in
> > > >> > > > Samza
> > > >> > > > > > in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > nex
> > > >> > > > > > > > > > > >> step.
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > To answer your question why increasing the
> > > >> partition
> > > >> > > > > number
> > > >> > > > > > > will
> > > >> > > > > > > > > > > >> increase
> > > >> > > > > > > > > > > >> > the throughput of the kafka consumer in the
> > > >> > container,
> > > >> > > > > Kafka
> > > >> > > > > > > > > > consumer
> > > >> > > > > > > > > > > >> can
> > > >> > > > > > > > > > > >> > potentially fetch more data in one
> > > FetchResponse
> > > >> > with
> > > >> > > > more
> > > >> > > > > > > > > > partitions
> > > >> > > > > > > > > > > in
> > > >> > > > > > > > > > > >> > the FetchRequest. This is because we limit
> > the
> > > >> > maximum
> > > >> > > > > > amount
> > > >> > > > > > > of
> > > >> > > > > > > > > > data
> > > >> > > > > > > > > > > >> that
> > > >> > > > > > > > > > > >> > can be fetch for a given partition in the
> > > >> > > FetchResponse.
> > > >> > > > > > This
> > > >> > > > > > > by
> > > >> > > > > > > > > > > >> default is
> > > >> > > > > > > > > > > >> > set to 1 MB. And there is reason that we
> can
> > > not
> > > >> > > > > arbitrarily
> > > >> > > > > > > > bump
> > > >> > > > > > > > > up
> > > >> > > > > > > > > > > >> this
> > > >> > > > > > > > > > > >> > limit.
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > To answer your question how partition
> > expansion
> > > >> in
> > > >> > > Kafka
> > > >> > > > > > > impacts
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > >> > clients, Kafka consumer is able to
> > > automatically
> > > >> > > detect
> > > >> > > > > new
> > > >> > > > > > > > > > partition
> > > >> > > > > > > > > > > of
> > > >> > > > > > > > > > > >> > the topic and reassign all (both old and
> new)
> > > >> > > partitions
> > > >> > > > > > > across
> > > >> > > > > > > > > > > >> consumers
> > > >> > > > > > > > > > > >> > in the consumer group IF you tell consumer
> > the
> > > >> topic
> > > >> > > to
> > > >> > > > be
> > > >> > > > > > > > > > subscribed.
> > > >> > > > > > > > > > > >> But
> > > >> > > > > > > > > > > >> > consumer in Samza's container uses another
> > way
> > > of
> > > >> > > > > > > subscription.
> > > >> > > > > > > > > > > Instead
> > > >> > > > > > > > > > > >> of
> > > >> > > > > > > > > > > >> > subscribing to the topic, the consumer in
> > > Samza's
> > > >> > > > > container
> > > >> > > > > > > > > > subscribes
> > > >> > > > > > > > > > > >> to
> > > >> > > > > > > > > > > >> > the specific partitions of the topic. In
> this
> > > >> case,
> > > >> > if
> > > >> > > > new
> > > >> > > > > > > > > > partitions
> > > >> > > > > > > > > > > >> have
> > > >> > > > > > > > > > > >> > been added, Samza will need to explicitly
> > > >> subscribe
> > > >> > to
> > > >> > > > the
> > > >> > > > > > new
> > > >> > > > > > > > > > > >> partitions
> > > >> > > > > > > > > > > >> > of the topic. The "Handle partition
> expansion
> > > >> while
> > > >> > > > tasks
> > > >> > > > > > are
> > > >> > > > > > > > > > running"
> > > >> > > > > > > > > > > >> > section in the SEP addresses this issue in
> > > Samza
> > > >> --
> > > >> > it
> > > >> > > > > > > > > recalculates
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > >> job
> > > >> > > > > > > > > > > >> > model and restart container so that
> consumer
> > > can
> > > >> > > > subscribe
> > > >> > > > > > to
> > > >> > > > > > > > the
> > > >> > > > > > > > > > new
> > > >> > > > > > > > > > > >> > partitions.
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > I will ask other dev to take a look at the
> > > >> > proposal. I
> > > >> > > > > will
> > > >> > > > > > > > start
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > >> > voting thread tomorrow if there is no
> further
> > > >> > concern
> > > >> > > > with
> > > >> > > > > > the
> > > >> > > > > > > > > SEP.
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > Thanks!
> > > >> > > > > > > > > > > >> > Dong
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina
> > Ramesh
> > > >> > > > (Apache) <
> > > >> > > > > > > > > > > >> > nav...@apache.org>
> > > >> > > > > > > > > > > >> > wrote:
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >> > > Hey Dong,
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > >  I have updated the motivation section
> to
> > > >> > clarify
> > > >> > > > > this.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Thanks for updating the motivation.
> Couple
> > of
> > > >> > notes
> > > >> > > > > here:
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > 1.
> > > >> > > > > > > > > > > >> > > > "The motivation of increasing partition
> > > >> number
> > > >> > of
> > > >> > > > > Kafka
> > > >> > > > > > > > topic
> > > >> > > > > > > > > > > >> includes
> > > >> > > > > > > > > > > >> > 1)
> > > >> > > > > > > > > > > >> > > limit the maximum size of a partition in
> > > order
> > > >> to
> > > >> > > > > improve
> > > >> > > > > > > > broker
> > > >> > > > > > > > > > > >> > > performance and 2) increase throughput of
> > > Kafka
> > > >> > > > consumer
> > > >> > > > > > in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > Samza
> > > >> > > > > > > > > > > >> > > container."
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > It's unclear to me how increasing the
> > > partition
> > > >> > > number
> > > >> > > > > > will
> > > >> > > > > > > > > > increase
> > > >> > > > > > > > > > > >> the
> > > >> > > > > > > > > > > >> > > throughput of the kafka consumer in the
> > > >> container?
> > > >> > > > > > > > > Theoretically,
> > > >> > > > > > > > > > > you
> > > >> > > > > > > > > > > >> > will
> > > >> > > > > > > > > > > >> > > still be consuming the same amount of
> data
> > in
> > > >> the
> > > >> > > > > > container,
> > > >> > > > > > > > > > > >> irrespective
> > > >> > > > > > > > > > > >> > > of whether it is coming from one
> partition
> > or
> > > >> more
> > > >> > > > than
> > > >> > > > > > one
> > > >> > > > > > > > > > expanded
> > > >> > > > > > > > > > > >> > > partitions. Can you please explain it for
> > me
> > > >> here,
> > > >> > > > what
> > > >> > > > > > you
> > > >> > > > > > > > mean
> > > >> > > > > > > > > > by
> > > >> > > > > > > > > > > >> that?
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > 2. I believe the second paragraph under
> > > >> motivation
> > > >> > > is
> > > >> > > > > > simply
> > > >> > > > > > > > > > talking
> > > >> > > > > > > > > > > >> > about
> > > >> > > > > > > > > > > >> > > the scope of the current SEP. It will be
> > > >> easier to
> > > >> > > > read
> > > >> > > > > if
> > > >> > > > > > > > what
> > > >> > > > > > > > > > > >> solution
> > > >> > > > > > > > > > > >> > is
> > > >> > > > > > > > > > > >> > > included in this SEP and what is left out
> > as
> > > >> not
> > > >> > in
> > > >> > > > > scope.
> > > >> > > > > > > > (for
> > > >> > > > > > > > > > > >> example,
> > > >> > > > > > > > > > > >> > > expansions for stateful jobs is supported
> > or
> > > >> not).
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > We need to persist the task-to-sspList
> > > >> mapping
> > > >> > in
> > > >> > > > the
> > > >> > > > > > > > > > > >> > > coordinator stream so that the job can
> > derive
> > > >> the
> > > >> > > > > original
> > > >> > > > > > > > > number
> > > >> > > > > > > > > > of
> > > >> > > > > > > > > > > >> > > partitions of each input stream
> regardless
> > of
> > > >> how
> > > >> > > many
> > > >> > > > > > times
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > >> > partition
> > > >> > > > > > > > > > > >> > > has expanded. Does this make sense?
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Yes. It does!
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > I am not sure how this is related to
> the
> > > >> > locality
> > > >> > > > > > though.
> > > >> > > > > > > > Can
> > > >> > > > > > > > > > you
> > > >> > > > > > > > > > > >> > clarify
> > > >> > > > > > > > > > > >> > > your question if I haven't answered your
> > > >> question?
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > It's not related. I just meant to give an
> > > >> example
> > > >> > of
> > > >> > > > yet
> > > >> > > > > > > > another
> > > >> > > > > > > > > > > >> > > coordinator message that is persisted.
> Your
> > > >> > > > ssp-to-task
> > > >> > > > > > > > mapping
> > > >> > > > > > > > > is
> > > >> > > > > > > > > > > >> > > following a similar pattern for
> persisting.
> > > >> Just
> > > >> > > > wanted
> > > >> > > > > to
> > > >> > > > > > > > > clarify
> > > >> > > > > > > > > > > >> that.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > Can you let me know if this, together
> > with
> > > >> the
> > > >> > > > answers
> > > >> > > > > > in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > >> previous
> > > >> > > > > > > > > > > >> > > email, addresses all your questions?
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Yes. I believe you have addressed most of
> > my
> > > >> > > > questions.
> > > >> > > > > > > Thanks
> > > >> > > > > > > > > for
> > > >> > > > > > > > > > > >> taking
> > > >> > > > > > > > > > > >> > > time to do that.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > Is there specific question you have
> > > regarding
> > > >> > > > > partition
> > > >> > > > > > > > > > > >> > > expansion in Kafka?
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > I guess my questions are on how partition
> > > >> > expansion
> > > >> > > in
> > > >> > > > > > Kafka
> > > >> > > > > > > > > > impacts
> > > >> > > > > > > > > > > >> the
> > > >> > > > > > > > > > > >> > > clients. Iiuc, partition expansions are
> > done
> > > >> > > manually
> > > >> > > > in
> > > >> > > > > > > Kafka
> > > >> > > > > > > > > > based
> > > >> > > > > > > > > > > >> on
> > > >> > > > > > > > > > > >> > the
> > > >> > > > > > > > > > > >> > > bytes-in rate of the partition. Do the
> > > existing
> > > >> > > kafka
> > > >> > > > > > > clients
> > > >> > > > > > > > > > handle
> > > >> > > > > > > > > > > >> this
> > > >> > > > > > > > > > > >> > > expansion automatically? if yes, how does
> > it
> > > >> work?
> > > >> > > If
> > > >> > > > > not,
> > > >> > > > > > > are
> > > >> > > > > > > > > > there
> > > >> > > > > > > > > > > >> > plans
> > > >> > > > > > > > > > > >> > > to support it in the future?
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > Thus user's job should not need to
> > > bootstrap
> > > >> > > > key/value
> > > >> > > > > > > store
> > > >> > > > > > > > > > from
> > > >> > > > > > > > > > > >> the
> > > >> > > > > > > > > > > >> > > changelog topic.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Why is this discussion relevant here?
> > > Key/value
> > > >> > > store
> > > >> > > > /
> > > >> > > > > > > > > changelog
> > > >> > > > > > > > > > > >> topic
> > > >> > > > > > > > > > > >> > > partition is scoped with the context of a
> > > task.
> > > >> > > Since
> > > >> > > > we
> > > >> > > > > > are
> > > >> > > > > > > > not
> > > >> > > > > > > > > > > >> changing
> > > >> > > > > > > > > > > >> > > the number of tasks, I don't think it is
> > > >> required
> > > >> > to
> > > >> > > > > > mention
> > > >> > > > > > > > it
> > > >> > > > > > > > > > > here.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > The new method takes the
> > > >> > > > SystemStreamPartition-to-Task
> > > >> > > > > > > > > > assignment
> > > >> > > > > > > > > > > >> from
> > > >> > > > > > > > > > > >> > > the previous job model which can be read
> > from
> > > >> the
> > > >> > > > > > > coordinator
> > > >> > > > > > > > > > > stream.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Jobmodel is currently not persisted to
> > > >> coordinator
> > > >> > > > > stream.
> > > >> > > > > > > In
> > > >> > > > > > > > > your
> > > >> > > > > > > > > > > >> > design,
> > > >> > > > > > > > > > > >> > > you talk about writing separate
> coordinator
> > > >> > messages
> > > >> > > > for
> > > >> > > > > > > > > > ssp-to-task
> > > >> > > > > > > > > > > >> > > assignments. Hence, please correct this
> > > >> statement.
> > > >> > > It
> > > >> > > > is
> > > >> > > > > > > kind
> > > >> > > > > > > > of
> > > >> > > > > > > > > > > >> > misleading
> > > >> > > > > > > > > > > >> > > to the reader.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > My biggest gripe with this SEP is that it
> > > seems
> > > >> > > like a
> > > >> > > > > > > > > tailor-made
> > > >> > > > > > > > > > > >> > solution
> > > >> > > > > > > > > > > >> > > that relies on the semantics of the Kafka
> > > >> system
> > > >> > and
> > > >> > > > > yet,
> > > >> > > > > > we
> > > >> > > > > > > > are
> > > >> > > > > > > > > > > >> trying
> > > >> > > > > > > > > > > >> > to
> > > >> > > > > > > > > > > >> > > masquerade that as operational
> requirements
> > > for
> > > >> > > other
> > > >> > > > > > > systems
> > > >> > > > > > > > > > > >> interacting
> > > >> > > > > > > > > > > >> > > with Samza. (Not to say that this is the
> > > first
> > > >> > time
> > > >> > > > > such a
> > > >> > > > > > > > > choice
> > > >> > > > > > > > > > is
> > > >> > > > > > > > > > > >> > being
> > > >> > > > > > > > > > > >> > > made in the Samza design). I am not
> seeing
> > > how
> > > >> > this
> > > >> > > > can
> > > >> > > > > a
> > > >> > > > > > > > > > "general"
> > > >> > > > > > > > > > > >> > > solution for all input systems. That's my
> > two
> > > >> > > cents. I
> > > >> > > > > > would
> > > >> > > > > > > > > like
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > >> hear
> > > >> > > > > > > > > > > >> > > alternative points of view for this from
> > > other
> > > >> > devs.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Please make sure you have enough eyes on
> > this
> > > >> SEP.
> > > >> > > If
> > > >> > > > > you
> > > >> > > > > > > do,
> > > >> > > > > > > > > > please
> > > >> > > > > > > > > > > >> > start
> > > >> > > > > > > > > > > >> > > a VOTE thread to approve this SEP.
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > Thanks!
> > > >> > > > > > > > > > > >> > > Navina
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong
> Lin
> > <
> > > >> > > > > > > > lindon...@gmail.com
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > > >> wrote:
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> > > > Hey Navina,
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > I have updated the wiki based on your
> > > >> > suggestion.
> > > >> > > > More
> > > >> > > > > > > > > > > >> specifically, I
> > > >> > > > > > > > > > > >> > > have
> > > >> > > > > > > > > > > >> > > > made the following changes:
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > - Improved Problem section and
> Motivation
> > > >> > section
> > > >> > > to
> > > >> > > > > > > > describe
> > > >> > > > > > > > > > why
> > > >> > > > > > > > > > > we
> > > >> > > > > > > > > > > >> > use
> > > >> > > > > > > > > > > >> > > > the solution in this proposal instead
> of
> > > >> > tackling
> > > >> > > > the
> > > >> > > > > > > > problem
> > > >> > > > > > > > > of
> > > >> > > > > > > > > > > >> task
> > > >> > > > > > > > > > > >> > > > expansion directly.
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > - Illustrate the design in a way that
> > > doesn't
> > > >> > bind
> > > >> > > > to
> > > >> > > > > > > Kafka.
> > > >> > > > > > > > > > Kafka
> > > >> > > > > > > > > > > >> is
> > > >> > > > > > > > > > > >> > > only
> > > >> > > > > > > > > > > >> > > > used as example to illustrate why we
> want
> > > to
> > > >> > > expand
> > > >> > > > > > > > partition
> > > >> > > > > > > > > > > >> expansion
> > > >> > > > > > > > > > > >> > > and
> > > >> > > > > > > > > > > >> > > > whether the operational requirement can
> > be
> > > >> > > supported
> > > >> > > > > > when
> > > >> > > > > > > > > Kafka
> > > >> > > > > > > > > > is
> > > >> > > > > > > > > > > >> used
> > > >> > > > > > > > > > > >> > > as
> > > >> > > > > > > > > > > >> > > > the input system. Note that the
> proposed
> > > >> > solution
> > > >> > > > > should
> > > >> > > > > > > > work
> > > >> > > > > > > > > > for
> > > >> > > > > > > > > > > >> any
> > > >> > > > > > > > > > > >> > > input
> > > >> > > > > > > > > > > >> > > > system that meets the operational
> > > requirement
> > > >> > > > > described
> > > >> > > > > > in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > wiki.
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > - Fixed the problem in the figure.
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > - Added a new class
> > > >> > GroupBySystemStreamPartitionFi
> > > >> > > > > > > > xedTaskNum
> > > >> > > > > > > > > to
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > wiki.
> > > >> > > > > > > > > > > >> > > > Together with
> > GroupByPartitionFixedTaskNum,
> > > >> it
> > > >> > > > should
> > > >> > > > > > > ensure
> > > >> > > > > > > > > > that
> > > >> > > > > > > > > > > we
> > > >> > > > > > > > > > > >> > > have a
> > > >> > > > > > > > > > > >> > > > solution to enable partition expansion
> > for
> > > >> all
> > > >> > > users
> > > >> > > > > > that
> > > >> > > > > > > > are
> > > >> > > > > > > > > > > using
> > > >> > > > > > > > > > > >> > > > pre-defined grouper in Samza. Note that
> > > those
> > > >> > > users
> > > >> > > > > who
> > > >> > > > > > > use
> > > >> > > > > > > > > > custom
> > > >> > > > > > > > > > > >> > > grouper
> > > >> > > > > > > > > > > >> > > > would need to update their
> > implementation.
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > Can you let me know if this, together
> > with
> > > >> the
> > > >> > > > answers
> > > >> > > > > > in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > > >> previous
> > > >> > > > > > > > > > > >> > > > email, addresses all your questions?
> > Thanks
> > > >> for
> > > >> > > > taking
> > > >> > > > > > > time
> > > >> > > > > > > > to
> > > >> > > > > > > > > > > >> review
> > > >> > > > > > > > > > > >> > the
> > > >> > > > > > > > > > > >> > > > proposal.
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > Regards,
> > > >> > > > > > > > > > > >> > > > Dong
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong
> > Lin
> > > <
> > > >> > > > > > > > > lindon...@gmail.com
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > >> > wrote:
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > > > > Hey Navina,
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > Thanks much for your comments. Please
> > see
> > > >> my
> > > >> > > reply
> > > >> > > > > > > inline.
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM,
> > Navina
> > > >> > Ramesh
> > > >> > > > > > > (Apache) <
> > > >> > > > > > > > > > > >> > > > > nav...@apache.org> wrote:
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a
> > > couple
> > > >> of
> > > >> > > > > > questions
> > > >> > > > > > > to
> > > >> > > > > > > > > > > >> understand
> > > >> > > > > > > > > > > >> > > > your
> > > >> > > > > > > > > > > >> > > > >> proposal better:
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> * Under motivation, you mention that
> > > "_We
> > > >> > > expect
> > > >> > > > > this
> > > >> > > > > > > > > > solution
> > > >> > > > > > > > > > > to
> > > >> > > > > > > > > > > >> > work
> > > >> > > > > > > > > > > >> > > > >> similarly with other input system as
> > > >> well._",
> > > >> > > > yet I
> > > >> > > > > > > don't
> > > >> > > > > > > > > see
> > > >> > > > > > > > > > > any
> > > >> > > > > > > > > > > >> > > > >> discussion on how it will work with
> > > other
> > > >> > input
> > > >> > > > > > > systems.
> > > >> > > > > > > > > That
> > > >> > > > > > > > > > > is,
> > > >> > > > > > > > > > > >> > what
> > > >> > > > > > > > > > > >> > > > >> kind
> > > >> > > > > > > > > > > >> > > > >> of contract does samza expect from
> > other
> > > >> > input
> > > >> > > > > > systems
> > > >> > > > > > > ?
> > > >> > > > > > > > If
> > > >> > > > > > > > > > we
> > > >> > > > > > > > > > > >> are
> > > >> > > > > > > > > > > >> > not
> > > >> > > > > > > > > > > >> > > > >> planning to provide a generic
> > solution,
> > > it
> > > >> > > might
> > > >> > > > be
> > > >> > > > > > > worth
> > > >> > > > > > > > > > > >> calling it
> > > >> > > > > > > > > > > >> > > out
> > > >> > > > > > > > > > > >> > > > >> in
> > > >> > > > > > > > > > > >> > > > >> the SEP.
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > I think the contract we expect from
> > other
> > > >> > > systems
> > > >> > > > > are
> > > >> > > > > > > > > exactly
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > > > operational requirement mentioned in
> > the
> > > >> SEP,
> > > >> > > i.e.
> > > >> > > > > > > > > partitions
> > > >> > > > > > > > > > > >> should
> > > >> > > > > > > > > > > >> > > > always
> > > >> > > > > > > > > > > >> > > > > be doubled and the hash algorithm
> > should
> > > >> > module
> > > >> > > > the
> > > >> > > > > > > number
> > > >> > > > > > > > > of
> > > >> > > > > > > > > > > >> > > partitions.
> > > >> > > > > > > > > > > >> > > > > SEP-5 should also allow partition
> > > >> expansion of
> > > >> > > all
> > > >> > > > > > input
> > > >> > > > > > > > > > systems
> > > >> > > > > > > > > > > >> that
> > > >> > > > > > > > > > > >> > > > meet
> > > >> > > > > > > > > > > >> > > > > these two requirements. I have
> updated
> > > the
> > > >> > > > > motivation
> > > >> > > > > > > > > section
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > >> > > clarify
> > > >> > > > > > > > > > > >> > > > > this.
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> * I understand the partition mapping
> > > logic
> > > >> > you
> > > >> > > > have
> > > >> > > > > > > > > proposed.
> > > >> > > > > > > > > > > >> But I
> > > >> > > > > > > > > > > >> > > > think
> > > >> > > > > > > > > > > >> > > > >> the example explanation doesn't
> match
> > > the
> > > >> > > > diagram.
> > > >> > > > > In
> > > >> > > > > > > the
> > > >> > > > > > > > > > > >> diagram,
> > > >> > > > > > > > > > > >> > > after
> > > >> > > > > > > > > > > >> > > > >> expansion, partiion-0 and
> partition-1
> > > are
> > > >> > > > pointing
> > > >> > > > > to
> > > >> > > > > > > > > bucket
> > > >> > > > > > > > > > 0
> > > >> > > > > > > > > > > >> and
> > > >> > > > > > > > > > > >> > > > >> partition-3 and partition-4 are
> > pointing
> > > >> to
> > > >> > > > bucket
> > > >> > > > > > 1. I
> > > >> > > > > > > > > think
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > former
> > > >> > > > > > > > > > > >> > > > >> has to be partition-0 and
> partition-2
> > > and
> > > >> the
> > > >> > > > > latter,
> > > >> > > > > > > is
> > > >> > > > > > > > > > > >> partition-1
> > > >> > > > > > > > > > > >> > > and
> > > >> > > > > > > > > > > >> > > > >> partition-3. If I am wrong, please
> > help
> > > me
> > > >> > > > > understand
> > > >> > > > > > > the
> > > >> > > > > > > > > > logic
> > > >> > > > > > > > > > > >> :)
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > Good catch. I will update the figure
> to
> > > fix
> > > >> > this
> > > >> > > > > > > problem.
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> * I don't know how partition
> expansion
> > > in
> > > >> > Kafka
> > > >> > > > > > works.
> > > >> > > > > > > I
> > > >> > > > > > > > am
> > > >> > > > > > > > > > > >> familiar
> > > >> > > > > > > > > > > >> > > > with
> > > >> > > > > > > > > > > >> > > > >> how shard splitting happens in
> > Kinesis -
> > > >> > there
> > > >> > > is
> > > >> > > > > > > > > > hierarchical
> > > >> > > > > > > > > > > >> > > relation
> > > >> > > > > > > > > > > >> > > > >> between the parent and child shards.
> > > This
> > > >> > way,
> > > >> > > it
> > > >> > > > > > will
> > > >> > > > > > > > also
> > > >> > > > > > > > > > > allow
> > > >> > > > > > > > > > > >> > the
> > > >> > > > > > > > > > > >> > > > >> shards to be merged back. Iiuc,
> Kafka
> > > only
> > > >> > > > supports
> > > >> > > > > > > > > partition
> > > >> > > > > > > > > > > >> > > > "expansion",
> > > >> > > > > > > > > > > >> > > > >> as opposed to "splits". Can you
> > provide
> > > >> some
> > > >> > > > > context
> > > >> > > > > > or
> > > >> > > > > > > > > link
> > > >> > > > > > > > > > > >> related
> > > >> > > > > > > > > > > >> > > to
> > > >> > > > > > > > > > > >> > > > >> how
> > > >> > > > > > > > > > > >> > > > >> partition expansion works in Kafka?
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > I couldn't find any wiki on partition
> > > >> > expansion
> > > >> > > in
> > > >> > > > > > > Kafka.
> > > >> > > > > > > > > The
> > > >> > > > > > > > > > > >> > partition
> > > >> > > > > > > > > > > >> > > > > expansion logic in Kafka is very
> simply
> > > --
> > > >> it
> > > >> > > > simply
> > > >> > > > > > > adds
> > > >> > > > > > > > > new
> > > >> > > > > > > > > > > >> > partition
> > > >> > > > > > > > > > > >> > > > to
> > > >> > > > > > > > > > > >> > > > > the existing topic. Is there specific
> > > >> question
> > > >> > > you
> > > >> > > > > > have
> > > >> > > > > > > > > > > regarding
> > > >> > > > > > > > > > > >> > > > partition
> > > >> > > > > > > > > > > >> > > > > expansion in Kafka?
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> * Are you only recommending that
> > > expansion
> > > >> > can
> > > >> > > be
> > > >> > > > > > > > supported
> > > >> > > > > > > > > > for
> > > >> > > > > > > > > > > >> > samza
> > > >> > > > > > > > > > > >> > > > jobs
> > > >> > > > > > > > > > > >> > > > >> that use Kafka as input systems
> > **and**
> > > >> > > configure
> > > >> > > > > the
> > > >> > > > > > > > > > > SSPGrouper
> > > >> > > > > > > > > > > >> as
> > > >> > > > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds
> > to
> > > me
> > > >> > like
> > > >> > > > > this
> > > >> > > > > > > only
> > > >> > > > > > > > > > > applies
> > > >> > > > > > > > > > > >> > for
> > > >> > > > > > > > > > > >> > > > >> GroupByPartition. Please correct me
> > if I
> > > >> am
> > > >> > > > wrong.
> > > >> > > > > > What
> > > >> > > > > > > > is
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > > expectation
> > > >> > > > > > > > > > > >> > > > >> for custom SSP Groupers?
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > The expansion can be supported for
> > Samza
> > > >> jobs
> > > >> > if
> > > >> > > > the
> > > >> > > > > > > input
> > > >> > > > > > > > > > > system
> > > >> > > > > > > > > > > >> > meets
> > > >> > > > > > > > > > > >> > > > > the operational requirement mentioned
> > > >> above.
> > > >> > It
> > > >> > > > > > doesn't
> > > >> > > > > > > > have
> > > >> > > > > > > > > > to
> > > >> > > > > > > > > > > >> use
> > > >> > > > > > > > > > > >> > > Kafka
> > > >> > > > > > > > > > > >> > > > > as input system.
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > The current proposal provided
> solution
> > > for
> > > >> > jobs
> > > >> > > > that
> > > >> > > > > > > > > currently
> > > >> > > > > > > > > > > use
> > > >> > > > > > > > > > > >> > > > > GroupByPartition. The proposal can be
> > > >> extended
> > > >> > > to
> > > >> > > > > > > support
> > > >> > > > > > > > > jobs
> > > >> > > > > > > > > > > >> that
> > > >> > > > > > > > > > > >> > use
> > > >> > > > > > > > > > > >> > > > > other grouper that are pre-defined in
> > > >> Samza.
> > > >> > The
> > > >> > > > > > custom
> > > >> > > > > > > > SSP
> > > >> > > > > > > > > > > >> grouper
> > > >> > > > > > > > > > > >> > > needs
> > > >> > > > > > > > > > > >> > > > > to handle partition expansion similar
> > to
> > > >> how
> > > >> > > > > > > > > > > >> > > GroupByPartitionFixedTaskNum
> > > >> > > > > > > > > > > >> > > > > handles it and it is users'
> > > responsibility
> > > >> to
> > > >> > > > update
> > > >> > > > > > > their
> > > >> > > > > > > > > > > custom
> > > >> > > > > > > > > > > >> > > grouper
> > > >> > > > > > > > > > > >> > > > > implementation.
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task
> > > >> assignment to
> > > >> > > > > > > coordinator
> > > >> > > > > > > > > > > stream:
> > > >> > > > > > > > > > > >> > > Today,
> > > >> > > > > > > > > > > >> > > > >> the JobModel encapsulates the data
> > model
> > > >> in
> > > >> > > samza
> > > >> > > > > > which
> > > >> > > > > > > > > also
> > > >> > > > > > > > > > > >> > includes
> > > >> > > > > > > > > > > >> > > > >> **TaskModels**. TaskModel, typically
> > > shows
> > > >> > the
> > > >> > > > > > > > > > task-to-sspList
> > > >> > > > > > > > > > > >> > > mapping.
> > > >> > > > > > > > > > > >> > > > >> What is the reason for using a
> > separate
> > > >> > > > coordinator
> > > >> > > > > > > > stream
> > > >> > > > > > > > > > > >> message
> > > >> > > > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because
> the
> > > >> > JobModel
> > > >> > > > > > itself
> > > >> > > > > > > is
> > > >> > > > > > > > > not
> > > >> > > > > > > > > > > >> > > persisted
> > > >> > > > > > > > > > > >> > > > in
> > > >> > > > > > > > > > > >> > > > >> the coordinator stream today?  The
> > > reason
> > > >> > > > locality
> > > >> > > > > > > exists
> > > >> > > > > > > > > > > >> outside of
> > > >> > > > > > > > > > > >> > > the
> > > >> > > > > > > > > > > >> > > > >> jobmodel is because *locality*
> > > >> information is
> > > >> > > > > written
> > > >> > > > > > > by
> > > >> > > > > > > > > each
> > > >> > > > > > > > > > > >> > > container,
> > > >> > > > > > > > > > > >> > > > >> where as it is consumed only by the
> > > leader
> > > >> > > > > > > > > jobcoordinator/AM.
> > > >> > > > > > > > > > > In
> > > >> > > > > > > > > > > >> > this
> > > >> > > > > > > > > > > >> > > > >> case,
> > > >> > > > > > > > > > > >> > > > >> the writer of the mapping
> information
> > > and
> > > >> the
> > > >> > > > > reader
> > > >> > > > > > is
> > > >> > > > > > > > > still
> > > >> > > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > leader
> > > >> > > > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to
> > > >> understand
> > > >> > the
> > > >> > > > > > > > motivation
> > > >> > > > > > > > > > for
> > > >> > > > > > > > > > > >> this
> > > >> > > > > > > > > > > >> > > > >> choice.
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > Yes, the reason for using a separate
> > > >> > coordinate
> > > >> > > > > stream
> > > >> > > > > > > > > message
> > > >> > > > > > > > > > > is
> > > >> > > > > > > > > > > >> > > because
> > > >> > > > > > > > > > > >> > > > > the task-to-sspList mapping is not
> > > >> currently
> > > >> > > > > persisted
> > > >> > > > > > > in
> > > >> > > > > > > > > the
> > > >> > > > > > > > > > > >> > > coordinator
> > > >> > > > > > > > > > > >> > > > > stream. We wouldn't need to create
> this
> > > new
> > > >> > > stream
> > > >> > > > > > > message
> > > >> > > > > > > > > if
> > > >> > > > > > > > > > > >> > JobModel
> > > >> > > > > > > > > > > >> > > is
> > > >> > > > > > > > > > > >> > > > > persisted. We need to persist the
> > > >> > > task-to-sspList
> > > >> > > > > > > mapping
> > > >> > > > > > > > in
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > > > coordinator stream so that the job
> can
> > > >> derive
> > > >> > > the
> > > >> > > > > > > original
> > > >> > > > > > > > > > > number
> > > >> > > > > > > > > > > >> of
> > > >> > > > > > > > > > > >> > > > > partitions of each input stream
> > > regardless
> > > >> of
> > > >> > > how
> > > >> > > > > many
> > > >> > > > > > > > times
> > > >> > > > > > > > > > the
> > > >> > > > > > > > > > > >> > > > partition
> > > >> > > > > > > > > > > >> > > > > has expanded. Does this make sense?
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > I am not sure how this is related to
> > the
> > > >> > > locality
> > > >> > > > > > > though.
> > > >> > > > > > > > > Can
> > > >> > > > > > > > > > > you
> > > >> > > > > > > > > > > >> > > clarify
> > > >> > > > > > > > > > > >> > > > > your question if I haven't answered
> > your
> > > >> > > question?
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > > Thanks!
> > > >> > > > > > > > > > > >> > > > > Dong
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> Cheers!
> > > >> > > > > > > > > > > >> > > > >> Navina
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM,
> Dong
> > > Lin
> > > >> <
> > > >> > > > > > > > > > lindon...@gmail.com
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >> > > wrote:
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >> > Hi all,
> > > >> > > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > > >> > > > >> > We created SEP-5: Enable partition
> > > >> > expansion
> > > >> > > of
> > > >> > > > > > input
> > > >> > > > > > > > > > > streams.
> > > >> > > > > > > > > > > >> > > Please
> > > >> > > > > > > > > > > >> > > > >> find
> > > >> > > > > > > > > > > >> > > > >> > the SEP wiki in the link
> > > >> > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/
> > > >> > > > > > > confluence/display/SAMZA/SEP-
> > > >> > > > > > > > > > > >> > > > >> > 5%3A+Enable+partition+
> > > >> > > > expansion+of+input+streams
> > > >> > > > > > > > > > > >> > > > >> > .
> > > >> > > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > > >> > > > >> > You feedback is appreciated!
> > > >> > > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > > >> > > > >> > Thanks,
> > > >> > > > > > > > > > > >> > > > >> > Dong
> > > >> > > > > > > > > > > >> > > > >> >
> > > >> > > > > > > > > > > >> > > > >>
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > > >
> > > >> > > > > > > > > > > >> > > >
> > > >> > > > > > > > > > > >> > >
> > > >> > > > > > > > > > > >> >
> > > >> > > > > > > > > > > >>
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > > >
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> We are hiring in Streams Infra (Kafka/Samza/Datastream) !!
> > > >>
> > > >
> > > >
> > >
> ...
>
> [Message clipped]

Reply via email to