Hi, Dong and everyone,

Thanks for the detailed discussion on SEP-5! Really appreciate the thorough
consideration on this issue. I also noticed that Dong has updated the SEP-5
wiki to clarify:
1) SEP-5 provides a solution to retain the same number of task/state w/o
re-partitioning (as illustrated in the stateful join example)
2) Future work to expand number of tasks need to work together with
flexible re-partitioning to provide a complete solution

Due to the cost to be paid in task number expansion:
1) additional network I/O and latency in re-partitioning
2) shuffling of the states among tasks
The current form of SEP-5 provides an alternative when partition expansion
in the messaging system is not due to increase of total input rate.

The concern on the added complexity in grouper logic is valid. However, the
grouper-based solution is not completely unreasonable:
1) Grouper is a public interface and we are already open to customized
implementation of groupers, although not being a main use case
2) Deprecation of existing config-driven grouper needs longer time effort
to wait for fluent API has a better planner to automatically figuring out
the grouper to be used and stateful task expansion is automated. Hence, for
a foreseeable long time, grouper is still configured by the user.

So, in general, I am in favor of the proposed SEP-5, given that it provides
a least-resistance to address some pain points for Samza users, w/o
breaking any existing use cases in opt-in mode.

Some minor suggestions:
1) The class names are too long. Can we change them to
FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition?
2) I am still in favor of configurable partition expansion (i.e. new<->old
partition mapping) policy, since it makes this solution more general and
not fixed for Kafka. I am OK with default to power-of-2 expansion policy
and not introducing new config variable now.
3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey
class validates the current grouper factory class == the previous grouper
factory class in previous checkpoint. We need to make sure that we allow
the compatible change from GroupByPartition to FixedTasksGroupByPartition,
etc. Since FixedTasksGroupByPartition is a derived interface of
GroupByPartition, one possible solution is to check assignable (if current
grouper factory class is assignable to the previous grouper factory class)

Thanks a lot!

On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) <nav...@apache.org>
wrote:

> > But IMO it is the best available solution towards the support of
> partition expansion in comparison to alternative, no?
>
> At this time, relative to the other alternatives you have listed, this is a
> path of least effort to solving this problem. I agree to that. :)
>
> > I can merge those two sections or update the statement if the current
> statement
> has not clearly explained the reason of partition expansion in Kafka.
>
> Given the significance of what you are actually trying to solve, I think it
> will be better to have it in points. Let me come find you and we can update
> it.
>
> > I have updated wiki and added the task expansion to the Future Work
> section.
> On the other hand I still keep it in the Rejected Alternative section to
> explain why this future work does not replace the existing proposal in
> SEP-5. Does this sound reasonable?
>
> It is very confusing to me how the same point can be under "Future Work"
> and "Rejected Alternative". There is no question about the future work
> *replacing* SEP-5. Iiuc, this SEP is a subset for the partition expansion
> solution. So, I don't think increasing task count should be a rejected
> alternative.
>
> > I am also not sure why a feature needs to be "utmost priority" in order
> to be accepted. Can you explain a bit on that?
>
> I don't think I ever claimed that the feature needs to be of "utmost
> priority" to be accepted. I was just stating my opinion.
>
>
> Thanks!
> Navina
>
> On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Thanks much for the reply Navina. Please see my reply inline.
> >
> > On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) <
> nav...@apache.org
> > >
> > wrote:
> >
> > > Thanks to Jake, Dong and Kartik for keeping the discussion going.
> > >
> > > > Here are the pros and cons of the extra re-partitioning stage in
> > > comparison
> > > to SEP-5.
> > >
> > > I think that is good summarization of pros/cons for the repartitioning
> > > stage based solution. Can you please include it in your SEP? It seems
> > like
> > > you already have access. If you are still unable to access the wiki
> page,
> > > feel free to walk over to Samza area and find me!
> > >
> >
> > Sure. I have added this summary to the Alternative Section.
> >
> >
> > >
> > > > I think there is always a way for user to mess up their job if they
> > > configure the Samza job incorrectly.
> > >
> > > I don't think Jake or anyone is arguing about an "incorrectly"
> configured
> > > Samza job. The question was towards how easy/difficult it is for users
> to
> > > *not mess* up their job with incorrect configurations.
> > >
> > > > I also think the assumption made in this SEP is not particularly
> harder
> > > to understand than other existing configs in Samza.
> > >
> > > I disagree here. Other configs don't require you understand more than
> one
> > > assumption.
> > >
> > > There is already an overload of configs in Samza and I think we are
> > trying
> > > to shield it as much as possible from the users (esp. with fluent api).
> > > More specifically, we don't want the user to know about the internals
> of
> > > Samza such ssp grouper, taskname grouper etc. Since the proposed
> solution
> > > makes the configuration more complex to understand, it *is a* burden on
> > the
> > > user.
> > >
> > > Just because configs are the way it is, it doesn't mean we increase the
> > > complexity of it and push the burden on users to manage it correctly.
> My
> > > two cents.
> > >
> >
> > Sure, I agree the proposal requires user to understand the assumption in
> > order to expand the partition of the topic. But it is very subjective as
> to
> > whether the added complexity is acceptable or not. If there is better way
> > to allow user to expand partition of the input stream without making
> > assumption, then we can just do that. The current solution is not
> perfect.
> > But IMO it is the best available solution towards the support of
> partition
> > expansion in comparison to alternative, no?
> >
> >
> > > Here are a few things that I believe are needed for wrapping up the
> SEP:
> > >
> > > 1. For the longest time, I thought partition expansion happens in Kafka
> > > only when the volume of messages across partitions is too high. Based
> on
> > > this assumption, I would only assume that re-mapping expanded
> partitions
> > to
> > > the same task will have adverse effect on the throughput/resource
> > > utilization of the processor/container in Samza (for example, disk
> > > utilization may increase significantly. With disk quota throttling, it
> > > could cause the processor to drop.). However, after speaking with
> Xinyu,
> > it
> > > turns out that partition expansion also happens when there is a
> > > per-partition data retention limit imposed by Kafka (not sure if it is
> > only
> > > in LinkedIn or in Kafka open-source as well). Imo, this is the primary
> > > use-case that we are trying to solve for in Samza and it is not very
> > > obvious from the SEP.
> > > @Dong, can you please explain *the circumstances under which partition
> > > expansion can happen*, under "Motivation" section?  I disagree to the
> > > current motivation described as -> "This design doc provides a solution
> > to
> > > increase partition number of the input streams of a stateful Samza job
> > > while still ensuring the correctness of Samze job output. "
> > > This is a solution, albeit not fully done through this SEP alone.
> > >
> >
> > This is actually already described in the Problem and Goal section, i.e.
> > "For example, Kafka generally needs to limit the maximum size of each
> > partition to scale up its performance. Thus the number of partitions of a
> > Kafka topic needs to be expanded to reduce the partition size if the
> > average byte-in-rate or retention time of the Kafka topic has doubled". I
> > can merge those two sections or update the statement if the current
> > statement has not clearly explained the reason of partition expansion in
> > Kafka.
> >
> >
> > >
> > > 2. I think we are in consensus about the fact that increasing the task
> > > number and handling the state correctly is a good solution for Samza in
> > the
> > > long-run. In your rejected alternatives, you mention "However, this
> > feature
> > > alone does not solve the problem of allowing partition expansion.".
> What
> > > else is required to allow partition expansion? Can you please elaborate
> > on
> > > that in point #1 of the rejected alternatives? If there is still more
> > work
> > > to be done to support partition expansion in Samza, it is worthwhile to
> > > mention it under *Future Work*, instead of under "Rejected
> Alternatives".
> > > Perhaps you were waiting for edit permissions to the wiki. Please make
> > this
> > > change so it is well-tracked.
> > >
> >
> > I thought this is already explained in the rejected alternative section.
> > More specifically, it is said that "However, this feature alone does not
> > solve the problem of allowing partition expansion. For example, say we
> have
> > a job that joins two streams both of which have 3 partitions. If
> partition
> > number of one stream increases from 3 to 6, we would still want the task
> > number to remain 3 to make sure that messages with the same key from both
> > streams will be handled by the same task. This needs to be done with the
> > new grouper classes proposed in this doc."
> >
> > Does this explanation make sense?
> >
> > I have updated wiki and added the task expansion to the Future Work
> > section. On the other hand I still keep it in the Rejected Alternative
> > section to explain why this future work does not replace the existing
> > proposal in SEP-5. Does this sound reasonable?
> >
> >
> > > I am still not totally crazy about the proposed solution because it is
> > not
> > > clear for open-source, who or which use-cases stand to benefit. I am
> not
> > > convinced that this problem is of utmost priority for the Samza
> community
> > > *at this point of time*.
> > >
> >
> > I think the Problem and Goal section and the Motivation section have
> > illustrated the use-case for this feature. Let me answer your questions
> > more specifically:
> >
> > *Who will benefit from this feature:* any Samza user who runs stateful
> job
> > with input from Kafka and needs to expand partition of the input stream
> so
> > that the single partition size doesn't exceed a threshold.
> >
> > *Which use-case stand to benefit:* this SEP-5 is useful if user runs
> > stateful job with input from Kafka and needs to expand partition of the
> > input stream so that the single partition size doesn't exceed a
> threshold.
> >
> > *Why it is a important feature:* a user needs this feature if he runs
> > stateful job with input from Kafka and the partition size of Kafka has
> > become too large due to increase in throughput or increase in retention
> > time.
> >
> > I am not sure what kind of feature can be classified at "utmost
> priority".
> > I am also not sure why a feature needs to be "utmost priority" in order
> to
> > be accepted. Can you explain a bit on that? I think we should develop
> > feature that has a valid use-case.
> >
> >
> > > I am on the same page as Jake on this one. Not a +1, just a 0 (if that
> > even
> > > matters).
> > >
> > > Thanks!
> > > Navina
> > >
> > > On Sun, Jun 18, 2017 at 12:04 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > BTW, I will update the SEP-5 wiki with our latest discussion after I
> > have
> > > > got the wiki edit access.
> > > >
> > > > On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks everyone for the comment!
> > > > >
> > > > > I am currently leaning towards the current approach. I think Kartik
> > > > raised
> > > > > a good point that the extra repartitoning stage will also incur
> > > > additional
> > > > > throughput on Kafka in addition to the potential storage cost. Any
> > > other
> > > > > Samza developers also chime in and provide your opinions on this
> > > > proposal?
> > > > >
> > > > > Since this discussion thread has been open for three weeks, I will
> > > > > initiate voting thread on Monday if there is no major revision
> > > > suggestion.
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > > On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
> > > > > kparamasi...@linkedin.com.invalid> wrote:
> > > > >
> > > > >> Great discussion !
> > > > >>
> > > > >> Here are some more thoughts
> > > > >>
> > > > >> The point that repartitioning is a more general purpose solution
> is
> > > > surely
> > > > >> spot on.  For many source systems (Kinesis, Google Pub-Sub, any of
> > the
> > > > >> older queuing systems (rabbitMQ etc. etc.), repartitioning is
> > anyways
> > > > >> functionally required to do even simple keyed aggregations.   But
> in
> > > > most
> > > > >> of these systems, the concept of repartitioning either does not
> > exist
> > > or
> > > > >> exists in a way which is very unique (e.g. Kinesis).
> > > > >>
> > > > >> I think this feature is really only interesting for source systems
> > > like
> > > > >> Kafka and EventHub.  EventHub (last I checked) didn't support
> > > > >> repartitioning. So this is probably not super-interesting (yet)
> for
> > > > >> EventHub.
> > > > >>
> > > > >> So Kafka is clearly the main use case here.
> > > > >>
> > > > >> For Kafka, I think it is pretty rare for people to customize the
> > > hashing
> > > > >> algorithm for sending messages.  I would argue that less than 5%
> of
> > > the
> > > > >> population (i am being generous ;)) would do that.   The current
> > > > proposal
> > > > >> works with the default hashing scheme for Kafka.  So organizations
> > > will
> > > > >> typically never have to coordinate.
> > > > >>
> > > > >> If the proposed alternative (always repartition) was side-effect
> > free,
> > > > >> then
> > > > >> it would make sense to use an alternative design that would work
> for
> > > > 100%
> > > > >> of the population.    Repartitioning all input would however not
> be
> > a
> > > > >> feasible solution (atleast at LinkedIn) as it would double the
> kafka
> > > > >> workload.    If many samza jobs read from kafka topics, then the
> > > > increase
> > > > >> would be a function of the number of samza jobs.
> > > > >>
> > > > >> For low throughput kafka topics, surely explicit repartitioning
> > using
> > > > >> fluent api is feasible.
> > > > >>
> > > > >> If the proposal was to make this new policy the default then that
> > > would
> > > > >> clearly not make much sense.
> > > > >>
> > > > >> But it is an opt in policy.  If it is not applicable, people don't
> > > have
> > > > to
> > > > >> use it.
> > > > >>
> > > > >> I do have some questions about the implementation. I will try to
> > > respond
> > > > >> back after spending some more time on this.
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes <jacob.m...@gmail.com
> >
> > > > wrote:
> > > > >>
> > > > >> > Thanks, Dong.
> > > > >> >
> > > > >> > The summary looks accurate.
> > > > >> >
> > > > >> > I'll let the others chime in, as I believe my perspective has
> been
> > > > >> > adequately captured in this thread.
> > > > >> >
> > > > >> > -Jake
> > > > >> >
> > > > >> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > >> >
> > > > >> > > Hey Jacob,
> > > > >> > >
> > > > >> > > Thank you for taking so much time to discuss with me! I
> > appreciate
> > > > the
> > > > >> > > discussion and the insight. I will summarize our discussion
> > below.
> > > > >> > >
> > > > >> > > 1) Whether it is reasonable to store partition-to-task
> mapping.
> > > > >> > >
> > > > >> > > We agree that this partition-to-task mapping will be
> reasonable
> > if
> > > > we
> > > > >> > allow
> > > > >> > > user to specify either the new-partition-to-old-partition
> > mapping
> > > or
> > > > >> > > key-to-partition mapping in the future. SEP-5 doesn't
> currently
> > > > >> provide a
> > > > >> > > way for user to specify new-partition-to-old partition mapping
> > > > >> because we
> > > > >> > > don't have a good idea about that interface until we try to
> > enable
> > > > >> > > partition expansion for input system other than Kafka in the
> > > future.
> > > > >> This
> > > > >> > > is currently specified as the third future work in SEP-5.
> > > > >> > >
> > > > >> > > And if we decide to implement SEP-5, I will include a warning
> > > > message
> > > > >> > > regarding the use of partition-to-task, i.e. "this does not
> > > specify
> > > > >> the
> > > > >> > > key-to-task mapping". We agree that this could address the
> > concern
> > > > >> here.
> > > > >> > >
> > > > >> > > 2) Whether we should follow the approach in SEP-5 or use an
> > extra
> > > > >> > > re-partitioning stage in the stateful Samza job to enable
> > > partition
> > > > >> > > expansion.
> > > > >> > >
> > > > >> > > Here are the pros and cons of the extra re-partitioning stage
> in
> > > > >> > comparison
> > > > >> > > to SEP-5.
> > > > >> > >
> > > > >> > > Pros:
> > > > >> > > - It doesn't require owner of the Samza job to know the
> > > partitioning
> > > > >> > > algorithm of used for the input stream. If the owner of the
> > Samza
> > > > job
> > > > >> is
> > > > >> > in
> > > > >> > > a different organization than the producer of the input
> stream,
> > > this
> > > > >> > > solution frees different organizations from having to
> coordinate
> > > > with
> > > > >> > each
> > > > >> > > other.
> > > > >> > > - It doesn't require owner of the Samza job to specify the
> > > > >> partitioning
> > > > >> > > algorithm of used for the input stream. Thus less config.
> > > > >> > >
> > > > >> > > Cons:
> > > > >> > > - User has to make code change on their side to use the new
> > fluent
> > > > >> API.
> > > > >> > > - The extra partitioning stage would potentially increases
> > > latency.
> > > > >> > > - The extra partitioning stage would incur additional cost due
> > to
> > > > the
> > > > >> > extra
> > > > >> > > internal topic. The cost is probably not that much with the
> new
> > > > trim()
> > > > >> > API
> > > > >> > > in Kafka if Samza uses Kafka to store the internal topic. But
> > the
> > > > cost
> > > > >> > may
> > > > >> > > be doubled if Samza uses another input system that doesn't
> > provide
> > > > >> trim()
> > > > >> > > API to delete data on demand.
> > > > >> > >
> > > > >> > > My recommendation is to adopt a hybrid solution, i.e. we still
> > > > >> implement
> > > > >> > > the current proposal in SEP-5 so that we enable partition
> > > expansion
> > > > >> > without
> > > > >> > > incurring extra latency/cost and without requiring users to
> > change
> > > > >> their
> > > > >> > > code. And we can recommend user to use the extra partitioning
> > > stage
> > > > if
> > > > >> > the
> > > > >> > > coordination among different organization is indeed a concern.
> > > > >> > >
> > > > >> > > Can other developers also provide feedback regarding your
> > > preference
> > > > >> > > between the two?
> > > > >> > >
> > > > >> > > Thanks,
> > > > >> > > Dong
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > >
> > > > >> > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <
> > jacob.m...@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > Hey Dong,
> > > > >> > > >
> > > > >> > > > I appreciate your thoughtful responses. Let's do one more
> > round
> > > > :-)
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > > Here are my current concern with the three alternatives
> you
> > > > >> described
> > > > >> > > > > earlier:
> > > > >> > > > > - The first alternative requires support from input system
> > > which
> > > > >> is
> > > > >> > > > > currently not available. It will limit the usage of
> > partition
> > > > >> > expansion
> > > > >> > > > to
> > > > >> > > > > only systems that support such interface. And it is not
> > > > guaranteed
> > > > >> > that
> > > > >> > > > we
> > > > >> > > > > can persuade the developer of the input system to add this
> > > > >> interface.
> > > > >> > > > This
> > > > >> > > > > is not desirable for Samza in the long term.
> > > > >> > > >
> > > > >> > > > Agreed. It is very wishful thinking that each supported
> system
> > > > would
> > > > >> > > > provide such a contract.
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > >
> > > > >> > > > > - I can not comment on the second alternative because I
> > don't
> > > > >> > > understand
> > > > >> > > > > how it reshuffles all existing changelog data. We can
> > discuss
> > > > >> more if
> > > > >> > > > there
> > > > >> > > > > is more specific detail. My gut feel is that this will be
> > > > complex
> > > > >> and
> > > > >> > > > > carries performance overhead.
> > > > >> > > >
> > > > >> > > > After giving this more thought, I agree. There is no clear
> way
> > > to
> > > > >> > > migrate a
> > > > >> > > > changelog without knowing the original key->partition
> mapping.
> > > > Which
> > > > >> > > leads
> > > > >> > > > us to alternative 3...
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > >
> > > > >> > > > > - The third alternative requires performance overhead.
> Given
> > > > that
> > > > >> > user
> > > > >> > > > can
> > > > >> > > > > already use this solution to enable partition expansion,
> > maybe
> > > > >> Samza
> > > > >> > > > > developers can provide more input as to why we are not
> doing
> > > it
> > > > by
> > > > >> > > > default.
> > > > >> > > > > My gut feel is that it carries considerable performance
> > > overhead
> > > > >> and
> > > > >> > > > > increases the cost-to-serve Samze job (e.g. disk usage),
> > which
> > > > may
> > > > >> > make
> > > > >> > > > it
> > > > >> > > > > undesirable in the long term.
> > > > >> > > >
> > > > >> > > > I think the only performance overhead would be the mandatory
> > > > >> > > repartitioning
> > > > >> > > > stage for stateful jobs. But a repartitioner is usually much
> > > > faster
> > > > >> > than
> > > > >> > > > the downstream stateful job, so it only seems a cost to
> serve
> > > > issue.
> > > > >> > > >
> > > > >> > > > As for why we aren't already doing this, I would posit that
> > > before
> > > > >> the
> > > > >> > > > introduction of the high level API, which trivializes
> > > > >> repartitioning,
> > > > >> > it
> > > > >> > > > was unreasonable to expect each job owner to do the
> mandatory
> > > > >> > > > repartitioning. With the high level API, I would argue this
> is
> > > > much
> > > > >> > more
> > > > >> > > > doable.
> > > > >> > > >
> > > > >> > > >
> > > > >> > > > I am not sure it is true that "any future feature that
> > utilizes
> > > > this
> > > > >> > > > > mapping without accounting for the assumptions of this SEP
> > is
> > > > >> likely
> > > > >> > to
> > > > >> > > > > malfunction". Suppose we allow user to specify
> > > > >> new-to-old-partition
> > > > >> > > > > mapping, then we can use the partition-to-task mapping
> > > correctly
> > > > >> > > without
> > > > >> > > > > replying on the assumption in this SEP, right?
> > > > >> > > >
> > > > >> > > > Right, but my point was that the partition->task mapping is
> > not
> > > > >> > > sufficient
> > > > >> > > > by itself. So adding it by itself is potentially misleading.
> > > > >> > > >
> > > > >> > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <
> > lindon...@gmail.com>
> > > > >> wrote:
> > > > >> > > >
> > > > >> > > > > Thanks for the reply Jacob. Please see my comment inline.
> > > > >> > > > >
> > > > >> > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <
> > > > jacob.m...@gmail.com
> > > > >> >
> > > > >> > > > wrote:
> > > > >> > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > - For users that need partition expansion of the input
> > > > streams
> > > > >> > for
> > > > >> > > > > > stateful
> > > > >> > > > > > > job, they have a really big headache in the sense that
> > > Samza
> > > > >> does
> > > > >> > > not
> > > > >> > > > > > allow
> > > > >> > > > > > > partition expansion for stateful job. SEP-5 addresses
> > this
> > > > >> > headache
> > > > >> > > > for
> > > > >> > > > > > > them.
> > > > >> > > > > > > You are right that SEP-5 requires user to understand
> and
> > > > >> enforce
> > > > >> > > > > > > limitations across organizations. But it is still much
> > > > better
> > > > >> > than
> > > > >> > > > not
> > > > >> > > > > > > allowing user to expansion partition for stateful jobs
> > at
> > > > all,
> > > > >> > > right?
> > > > >> > > > > > Did I
> > > > >> > > > > > > miss something here?
> > > > >> > > > > >
> > > > >> > > > > > I guess this one is a matter of perspective.
> > > > >> > > > > >
> > > > >> > > > > > One argument is that if the system supports one case,
> it's
> > > > >> better
> > > > >> > > than
> > > > >> > > > > none
> > > > >> > > > > > because there is one less scenario in which the system
> > does
> > > > the
> > > > >> > wrong
> > > > >> > > > > > thing.
> > > > >> > > > > >
> > > > >> > > > > > The counter argument is for uniform and consistent
> > behavior,
> > > > >> which
> > > > >> > is
> > > > >> > > > > easy
> > > > >> > > > > > for users to understand and properly leverage.
> > > > >> > > > > >
> > > > >> > > > > > Specifically, I'd argue that the current rule is very
> > > simple:
> > > > >> "you
> > > > >> > > > cannot
> > > > >> > > > > > repartition inputs on a stateful job, so you must
> > > > over-partition
> > > > >> > the
> > > > >> > > > > > initial implementation". To me, while that rule is not
> > > ideal,
> > > > >> its
> > > > >> > > > > > simplicity is better that introducing a new solution
> that
> > > has
> > > > a
> > > > >> > bunch
> > > > >> > > > of
> > > > >> > > > > > caveats, any one of which could be missed. If any one of
> > the
> > > > >> > > > assumptions
> > > > >> > > > > in
> > > > >> > > > > > this SEP design are violated, the job would behave
> > > > incorrectly.
> > > > >> > That
> > > > >> > > > > puts a
> > > > >> > > > > > lot more burden on the users than the simpler rule.
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > > > I agree that we have different perspective here. It is
> true
> > > that
> > > > >> user
> > > > >> > > > would
> > > > >> > > > > mess up their job if they used this feature in a wrong
> way,
> > > i.e.
> > > > >> > > violate
> > > > >> > > > > the assumption made in SEP-5. On the other hand, I think
> > there
> > > > is
> > > > >> > > always
> > > > >> > > > a
> > > > >> > > > > way for user to mess up their job if they configure the
> > Samza
> > > > job
> > > > >> > > > > incorrectly. I also think the assumption made in this SEP
> is
> > > not
> > > > >> > > > > particularly harder to understand than other existing
> > configs
> > > in
> > > > >> > Samza.
> > > > >> > > > >
> > > > >> > > > > The answer to this can be subjective. I would love to hear
> > > > >> > perspective
> > > > >> > > > from
> > > > >> > > > > other developers on this issue.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > >
> > > > >> > > > > > That's why I mentioned a few alternatives that, while
> more
> > > > >> complex
> > > > >> > to
> > > > >> > > > > > implement, would provide a more consistent behavior with
> > > > simple
> > > > >> > rules
> > > > >> > > > for
> > > > >> > > > > > the users.
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > > > I am open to discuss alternative solutions that can
> address
> > > the
> > > > >> the
> > > > >> > > > problem
> > > > >> > > > > in a better manner. I am not opposed to complexity as long
> > as
> > > it
> > > > >> > gives
> > > > >> > > us
> > > > >> > > > > good long term benefits.
> > > > >> > > > >
> > > > >> > > > > Here are my current concern with the three alternatives
> you
> > > > >> described
> > > > >> > > > > earlier:
> > > > >> > > > >
> > > > >> > > > > - The first alternative requires support from input system
> > > which
> > > > >> is
> > > > >> > > > > currently not available. It will limit the usage of
> > partition
> > > > >> > expansion
> > > > >> > > > to
> > > > >> > > > > only systems that support such interface. And it is not
> > > > guaranteed
> > > > >> > that
> > > > >> > > > we
> > > > >> > > > > can persuade the developer of the input system to add this
> > > > >> interface.
> > > > >> > > > This
> > > > >> > > > > is not desirable for Samza in the long term.
> > > > >> > > > >
> > > > >> > > > > - I can not comment on the second alternative because I
> > don't
> > > > >> > > understand
> > > > >> > > > > how it reshuffles all existing changelog data. We can
> > discuss
> > > > >> more if
> > > > >> > > > there
> > > > >> > > > > is more specific detail. My gut feel is that this will be
> > > > complex
> > > > >> and
> > > > >> > > > > carries performance overhead.
> > > > >> > > > >
> > > > >> > > > > - The third alternative requires performance overhead.
> Given
> > > > that
> > > > >> > user
> > > > >> > > > can
> > > > >> > > > > already use this solution to enable partition expansion,
> > maybe
> > > > >> Samza
> > > > >> > > > > developers can provide more input as to why we are not
> doing
> > > it
> > > > by
> > > > >> > > > default.
> > > > >> > > > > My gut feel is that it carries considerable performance
> > > overhead
> > > > >> and
> > > > >> > > > > increases the cost-to-serve Samze job (e.g. disk usage),
> > which
> > > > may
> > > > >> > make
> > > > >> > > > it
> > > > >> > > > > undesirable in the long term.
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > >
> > > > >> > > > > > Yes, we need a similar check for
> > > > GroupBySystemStreamPartitionWi
> > > > >> > > > > > > thFixedTaskNum
> > > > >> > > > > > > as well. If there is more grouper classes needed in
> the
> > > > >> future,
> > > > >> > we
> > > > >> > > > can
> > > > >> > > > > > > solve this problem cleanly without new config. Given
> the
> > > > >> > > > > > > previousGrouperClass and newGrouperClass,
> > > > >> KafkaCheckpointLogKey
> > > > >> > > will
> > > > >> > > > > > throw
> > > > >> > > > > > > exception if and only if newGrouperClass is an
> instance
> > of
> > > > >> > > > > > > previousGrouperClass.
> > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should
> > > extend
> > > > >> > > > > > > GroupBySystemStreamPartition
> > > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should extend
> > > > >> > > GroupByPartition.
> > > > >> > > > > > Does
> > > > >> > > > > > > this address your concern?
> > > > >> > > > > >
> > > > >> > > > > > Sounds workable, thanks.
> > > > >> > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > Can
> > > > >> > > > > > > you be more specific why Partition-to-task mapping is
> > not
> > > > >> > > meaningful
> > > > >> > > > > > > without
> > > > >> > > > > > > some definition of the key-to-partition assignments
> and
> > > why
> > > > >> it is
> > > > >> > > > > > > incomplete and misleading?
> > > > >> > > > > >
> > > > >> > > > > >  A partition is (in my naive interpretation) an
> > independent
> > > > >> queue
> > > > >> > for
> > > > >> > > > > > messages of a particular key set. It is not the
> *identity*
> > > of
> > > > >> the
> > > > >> > > > > partition
> > > > >> > > > > > that determine the contents of the associated task's
> local
> > > > >> state.
> > > > >> > > > Rather
> > > > >> > > > > it
> > > > >> > > > > > is the *contents* of the partition that affect the
> task's
> > > > >> state. A
> > > > >> > > > > > partiton-to-task mapping only captures an identity
> > > > relationship:
> > > > >> > > > > > partition1->task1. Without the assumptions of this SEP,
> > this
> > > > is
> > > > >> > > > > > insufficient to determine the assignment of keys to
> tasks,
> > > > >> which is
> > > > >> > > > what
> > > > >> > > > > > really matters. Therefore, any future feature that
> > utilizes
> > > > this
> > > > >> > > > mapping
> > > > >> > > > > > without accounting for the assumptions of this SEP is
> > likely
> > > > to
> > > > >> > > > > > malfunction.
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > > I am not sure it is true that "any future feature that
> > > utilizes
> > > > >> this
> > > > >> > > > > mapping without accounting for the assumptions of this SEP
> > is
> > > > >> likely
> > > > >> > to
> > > > >> > > > > malfunction". Suppose we allow user to specify
> > > > >> new-to-old-partition
> > > > >> > > > > mapping, then we can use the partition-to-task mapping
> > > correctly
> > > > >> > > without
> > > > >> > > > > replying on the assumption in this SEP, right?
> > > > >> > > > >
> > > > >> > > > >
> > > > >> > > > > >
> > > > >> > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <
> > > > lindon...@gmail.com>
> > > > >> > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Hey Jacob,
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks for the explanation. It seems that your biggest
> > > > >> concern is
> > > > >> > > > with
> > > > >> > > > > > the
> > > > >> > > > > > > generality of the proposal. Let me try to address this
> > and
> > > > >> other
> > > > >> > > > > comments
> > > > >> > > > > > > below.
> > > > >> > > > > > >
> > > > >> > > > > > > 1) ... it will cause headaches for Samza users ...
> > > > >> > > > > > >
> > > > >> > > > > > > I am not sure I understand why this proposal causes
> > > headache
> > > > >> for
> > > > >> > > > Samza
> > > > >> > > > > > > users. Here is the impact of the SEP-5 on users:
> > > > >> > > > > > >
> > > > >> > > > > > > - For users that do not need partition expansion of
> the
> > > > input
> > > > >> > > stream,
> > > > >> > > > > > they
> > > > >> > > > > > > can use Samza without change change in
> > code/binary/config.
> > > > >> Thus
> > > > >> > > there
> > > > >> > > > > is
> > > > >> > > > > > no
> > > > >> > > > > > > headache for them.
> > > > >> > > > > > >
> > > > >> > > > > > > - For users that need partition expansion of the input
> > > > streams
> > > > >> > for
> > > > >> > > > > > > stateless job, they currently need to manually reboot
> > > their
> > > > >> Samza
> > > > >> > > job
> > > > >> > > > > in
> > > > >> > > > > > > order to let Samza consume the new partitions created
> > for
> > > > the
> > > > >> > > stream.
> > > > >> > > > > > SEP-5
> > > > >> > > > > > > actually reduced their headache by allowing Samza to
> > > > >> > automatically
> > > > >> > > > > detect
> > > > >> > > > > > > and consume new partitions.
> > > > >> > > > > > >
> > > > >> > > > > > > - For users that need partition expansion of the input
> > > > streams
> > > > >> > for
> > > > >> > > > > > stateful
> > > > >> > > > > > > job, they have a really big headache in the sense that
> > > Samza
> > > > >> does
> > > > >> > > not
> > > > >> > > > > > allow
> > > > >> > > > > > > partition expansion for stateful job. SEP-5 addresses
> > this
> > > > >> > headache
> > > > >> > > > for
> > > > >> > > > > > > them.
> > > > >> > > > > > >
> > > > >> > > > > > > You are right that SEP-5 requires user to understand
> and
> > > > >> enforce
> > > > >> > > > > > > limitations across organizations. But it is still much
> > > > better
> > > > >> > than
> > > > >> > > > not
> > > > >> > > > > > > allowing user to expansion partition for stateful jobs
> > at
> > > > all,
> > > > >> > > right?
> > > > >> > > > > > Did I
> > > > >> > > > > > > miss something here?
> > > > >> > > > > > >
> > > > >> > > > > > > 2) ... Separate orgs are often difficult to coordinate
> > > and a
> > > > >> > system
> > > > >> > > > > which
> > > > >> > > > > > > depends on such significant process/coordination is
> too
> > > > >> fragile
> > > > >> > for
> > > > >> > > > my
> > > > >> > > > > > > taste ..
> > > > >> > > > > > >
> > > > >> > > > > > > This is true. Ideally we want a system that is fully
> > > > >> > self-serving.
> > > > >> > > I
> > > > >> > > > > > think
> > > > >> > > > > > > this is a long term goal for Samza. Still, for the
> > reasons
> > > > >> > > described
> > > > >> > > > > > above,
> > > > >> > > > > > > I think something is better than nothing. I am open to
> > > > >> > alternative
> > > > >> > > > > design
> > > > >> > > > > > > that can support partition expansion for stateful jobs
> > > > without
> > > > >> > > > > requiring
> > > > >> > > > > > > coordination.
> > > > >> > > > > > >
> > > > >> > > > > > > 3) There is currently no supported way of sharing
> state
> > > > among
> > > > >> the
> > > > >> > > > tasks
> > > > >> > > > > > of
> > > > >> > > > > > > a container.  Each task has its own isolated store and
> > > that
> > > > >> > logical
> > > > >> > > > > > > isolation is the primary thing that enables Samza jobs
> > to
> > > > >> scale
> > > > >> > > with
> > > > >> > > > a
> > > > >> > > > > > > simple container count change. My feeling is that we
> > > should
> > > > >> not
> > > > >> > > > change
> > > > >> > > > > > > this without
> > > > >> > > > > > > good reason.
> > > > >> > > > > > >
> > > > >> > > > > > > I see your point. I will remove this sentence from the
> > > > >> motivation
> > > > >> > > > > > section.
> > > > >> > > > > > > This won't have any impact on the design of the SEP-5.
> > > Does
> > > > >> this
> > > > >> > > > > address
> > > > >> > > > > > > the problem?
> > > > >> > > > > > >
> > > > >> > > > > > > 4) With the current proposal, we'd also need a similar
> > > check
> > > > >> for
> > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well.
> > And
> > > > if
> > > > >> any
> > > > >> > > > other
> > > > >> > > > > > > groupers
> > > > >> > > > > > > were later added with both these modes, we'd probably
> > need
> > > > to
> > > > >> add
> > > > >> > > > those
> > > > >> > > > > > > too. It might be easier and cleaner to add a config to
> > > > ignore
> > > > >> > that
> > > > >> > > > > check
> > > > >> > > > > > > temporarily. Down side is that it further complicates
> > the
> > > > >> Samza
> > > > >> > > > config,
> > > > >> > > > > > > which is already huge. Thoughts?
> > > > >> > > > > > >
> > > > >> > > > > > > Yes, we need a similar check for
> > > > >> GroupBySystemStreamPartitionWi
> > > > >> > > > > > > thFixedTaskNum
> > > > >> > > > > > > as well. If there is more grouper classes needed in
> the
> > > > >> future,
> > > > >> > we
> > > > >> > > > can
> > > > >> > > > > > > solve this problem cleanly without new config. Given
> the
> > > > >> > > > > > > previousGrouperClass and newGrouperClass,
> > > > >> KafkaCheckpointLogKey
> > > > >> > > will
> > > > >> > > > > > throw
> > > > >> > > > > > > exception if and only if newGrouperClass is an
> instance
> > of
> > > > >> > > > > > > previousGrouperClass.
> > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should
> > > extend
> > > > >> > > > > > > GroupBySystemStreamPartition
> > > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should extend
> > > > >> > > GroupByPartition.
> > > > >> > > > > > Does
> > > > >> > > > > > > this address your concern?
> > > > >> > > > > > >
> > > > >> > > > > > > 5) The task-to-container and container-to-host
> mappings
> > > are
> > > > >> both
> > > > >> > > > > > meaningful
> > > > >> > > > > > > in context of the JobModel. Partition-to-task mapping
> is
> > > not
> > > > >> > > > meaningful
> > > > >> > > > > > > without
> > > > >> > > > > > > some definition of the key-to-partition assignments.
> > It's
> > > > >> > > incomplete
> > > > >> > > > > > > information and therefore misleading. I think it only
> > > makes
> > > > >> sense
> > > > >> > > to
> > > > >> > > > > use
> > > > >> > > > > > > this mapping if we adopt a solution wherein Samza also
> > > knows
> > > > >> the
> > > > >> > > > > > partition
> > > > >> > > > > > > key assignment.
> > > > >> > > > > > >
> > > > >> > > > > > > Partition-to-task is currently explicitly passed from
> > job
> > > > >> > > coordinator
> > > > >> > > > > to
> > > > >> > > > > > > each task as part of the job model to tell tasks which
> > > > >> partitions
> > > > >> > > to
> > > > >> > > > > > > consume from. I think we can store some definition of
> > the
> > > > >> > > > > > key-to-partition
> > > > >> > > > > > > assignments if Samza decides to get and use this
> > > information
> > > > >> in
> > > > >> > the
> > > > >> > > > > > > future. Can
> > > > >> > > > > > > you be more specific why Partition-to-task mapping is
> > not
> > > > >> > > meaningful
> > > > >> > > > > > > without
> > > > >> > > > > > > some definition of the key-to-partition assignments
> and
> > > why
> > > > >> it is
> > > > >> > > > > > > incomplete and misleading?
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > > Dong
> > > > >> > > > > > >
> > > > >> > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <
> > > > >> > jacob.m...@gmail.com>
> > > > >> > > > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > Hey Dong,
> > > > >> > > > > > > >
> > > > >> > > > > > > > I'm opposed (or a +0, at best) to this limited,
> > > > >> Kafka-specific
> > > > >> > > > > > solution.
> > > > >> > > > > > > I
> > > > >> > > > > > > > understand that the proposal is relatively simple to
> > > > >> implement,
> > > > >> > > > but I
> > > > >> > > > > > > think
> > > > >> > > > > > > > it will cause headaches for Samza users. They will
> not
> > > > only
> > > > >> > have
> > > > >> > > to
> > > > >> > > > > > > > understand all the limitations (increase only,
> double
> > > > >> > partitions
> > > > >> > > > > only,
> > > > >> > > > > > > > partition using hash+modulo, etc) of this approach,
> > but
> > > > >> > enforcing
> > > > >> > > > > these
> > > > >> > > > > > > > limitations can be a major problem, especially when
> > the
> > > > >> Samza
> > > > >> > > jobs
> > > > >> > > > > and
> > > > >> > > > > > > > message brokers are managed by separate orgs in a
> > > company.
> > > > >> > > Separate
> > > > >> > > > > > orgs
> > > > >> > > > > > > > are often difficult to coordinate and a system which
> > > > >> depends on
> > > > >> > > > such
> > > > >> > > > > > > > significant process/coordination is too fragile for
> my
> > > > >> taste.
> > > > >> > > > > > > >
> > > > >> > > > > > > > That said, I realize that my opinion is just one of
> > many
> > > > in
> > > > >> the
> > > > >> > > > > broader
> > > > >> > > > > > > > community which may feel differently, so let me
> > respond
> > > to
> > > > >> some
> > > > >> > > of
> > > > >> > > > > the
> > > > >> > > > > > > > other items in the discussion so we can clear them
> up:
> > > > >> > > > > > > >
> > > > >> > > > > > > > The task-to-container assignment matters because if
> > the
> > > > >> > > correlated
> > > > >> > > > > > tasks
> > > > >> > > > > > > > > (i.e. tasks that consume messages with the same
> key)
> > > > >> needs to
> > > > >> > > be
> > > > >> > > > in
> > > > >> > > > > > the
> > > > >> > > > > > > > > same container so that they can share the same
> > > key/value
> > > > >> > local
> > > > >> > > > > store
> > > > >> > > > > > on
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > same physical machine.
> > > > >> > > > > > > >
> > > > >> > > > > > > > There is currently no supported way of sharing state
> > > among
> > > > >> the
> > > > >> > > > tasks
> > > > >> > > > > > of a
> > > > >> > > > > > > > container.  Each task has its own isolated store and
> > > that
> > > > >> > logical
> > > > >> > > > > > > isolation
> > > > >> > > > > > > > is the primary thing that enables Samza jobs to
> scale
> > > > with a
> > > > >> > > simple
> > > > >> > > > > > > > container count change. My feeling is that we should
> > not
> > > > >> change
> > > > >> > > > this
> > > > >> > > > > > > > without good reason.
> > > > >> > > > > > > >
> > > > >> > > > > > > > I think we can hardcode new logic in
> > > > >> > KafkaCheckpointLogKey.scala
> > > > >> > > > such
> > > > >> > > > > > > that
> > > > >> > > > > > > > > exception will not be thrown if new grouper is
> > > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper
> is
> > > > >> > > > > > GroupByPartition.
> > > > >> > > > > > > > Does
> > > > >> > > > > > > > > this look reasonable?
> > > > >> > > > > > > >
> > > > >> > > > > > > > With the current proposal, we'd also need a similar
> > > check
> > > > >> for
> > > > >> > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as
> well.
> > > And
> > > > >> if
> > > > >> > any
> > > > >> > > > > other
> > > > >> > > > > > > > groupers were later added with both these modes,
> we'd
> > > > >> probably
> > > > >> > > need
> > > > >> > > > > to
> > > > >> > > > > > > add
> > > > >> > > > > > > > those too. It might be easier and cleaner to add a
> > > config
> > > > to
> > > > >> > > ignore
> > > > >> > > > > > that
> > > > >> > > > > > > > check temporarily. Down side is that it further
> > > > complicates
> > > > >> the
> > > > >> > > > Samza
> > > > >> > > > > > > > config, which is already huge. Thoughts?
> > > > >> > > > > > > >
> > > > >> > > > > > > > I think storing the previous task-to-partition
> mapping
> > > is
> > > > >> more
> > > > >> > > > > general
> > > > >> > > > > > > than
> > > > >> > > > > > > > > storing the partition count of all topics for the
> > > > >> following
> > > > >> > > > > reasons:
> > > > >> > > > > > > > > - Samza already stores the task-to-container
> mapping
> > > and
> > > > >> > > > > > > > container-to-host
> > > > >> > > > > > > > > mapping in the coordinator stream. It seems
> > consistent
> > > > to
> > > > >> > also
> > > > >> > > > > store
> > > > >> > > > > > > the
> > > > >> > > > > > > > > partition-to-task mapping. And this information
> may
> > be
> > > > >> useful
> > > > >> > > for
> > > > >> > > > > > other
> > > > >> > > > > > > > > use-case such as debugging.
> > > > >> > > > > > > > > - By having the new interface take the previous
> > > > >> > > task-to-partition
> > > > >> > > > > > > > > assignment instead of a topic-to-partition-count
> > > mapping
> > > > >> as
> > > > >> > new
> > > > >> > > > > > > > parameter,
> > > > >> > > > > > > > > we can potentially have grouper implementation to
> > > > support
> > > > >> > other
> > > > >> > > > > types
> > > > >> > > > > > > of
> > > > >> > > > > > > > > input systems.
> > > > >> > > > > > > > > - It is sightly simpler to store the
> > task-to-partition
> > > > >> > > assignment
> > > > >> > > > > > > because
> > > > >> > > > > > > > > we don't need to know whether this is the first
> > time a
> > > > >> job is
> > > > >> > > > > started
> > > > >> > > > > > > or
> > > > >> > > > > > > > > not. On the other hand, you can write
> > > > >> > topic-to-partition-count
> > > > >> > > > > > mapping
> > > > >> > > > > > > to
> > > > >> > > > > > > > > the coordinator stream only if this is the first
> > time
> > > > the
> > > > >> job
> > > > >> > > is
> > > > >> > > > > run
> > > > >> > > > > > > >
> > > > >> > > > > > > > The task-to-container and container-to-host mappings
> > are
> > > > >> both
> > > > >> > > > > > meaningful
> > > > >> > > > > > > in
> > > > >> > > > > > > > context of the JobModel. Partition-to-task mapping
> is
> > > not
> > > > >> > > > meaningful
> > > > >> > > > > > > > without some definition of the key-to-partition
> > > > assignments.
> > > > >> > It's
> > > > >> > > > > > > > incomplete information and therefore misleading. I
> > think
> > > > it
> > > > >> > only
> > > > >> > > > > makes
> > > > >> > > > > > > > sense to use this mapping if we adopt a solution
> > wherein
> > > > >> Samza
> > > > >> > > also
> > > > >> > > > > > knows
> > > > >> > > > > > > > the partition key assignment.
> > > > >> > > > > > > >
> > > > >> > > > > > > > -Jake
> > > > >> > > > > > > >
> > > > >> > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <
> > > > >> lindon...@gmail.com
> > > > >> > >
> > > > >> > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Hey Jacob,
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Thanks for taking time to review the SEP.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I agree with you and Navina that the current SEP
> > > doesn't
> > > > >> > > provide
> > > > >> > > > > > > support
> > > > >> > > > > > > > to
> > > > >> > > > > > > > > arbitrary input systems and it doesn't support
> > > partition
> > > > >> > > shrink.
> > > > >> > > > I
> > > > >> > > > > > > think
> > > > >> > > > > > > > > the scope of this SEP is to support partition
> > > expansion
> > > > >> for
> > > > >> > > Kafka
> > > > >> > > > > > (the
> > > > >> > > > > > > > most
> > > > >> > > > > > > > > widely used input system of Samza) and keep the
> door
> > > > open
> > > > >> for
> > > > >> > > > > > partition
> > > > >> > > > > > > > > support of various input systems. The current
> design
> > > can
> > > > >> > > support
> > > > >> > > > > any
> > > > >> > > > > > > > system
> > > > >> > > > > > > > > that meets the two operational requirement
> specified
> > > in
> > > > >> the
> > > > >> > > doc.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > While it is possible to support more types of
> input
> > > > >> systems,
> > > > >> > it
> > > > >> > > > > will
> > > > >> > > > > > > > likely
> > > > >> > > > > > > > > add more complexity to the design. For example,
> the
> > > > first
> > > > >> > > > > alternative
> > > > >> > > > > > > > > solution from you requires broker-side support to
> > > > >> negotiate
> > > > >> > > hash
> > > > >> > > > > > > > algorithm.
> > > > >> > > > > > > > > The second alternative solution requires changelog
> > > > >> partition
> > > > >> > > > > > reshuffle
> > > > >> > > > > > > > > which carries its own design complexity and
> > > performance
> > > > >> > > overhead.
> > > > >> > > > > > There
> > > > >> > > > > > > > is
> > > > >> > > > > > > > > tradeoff between the generality and the complexity
> > > among
> > > > >> > these
> > > > >> > > > > > > choices. I
> > > > >> > > > > > > > > like the current design because it is simple and
> > > > >> addresses a
> > > > >> > > big
> > > > >> > > > > > usage
> > > > >> > > > > > > > > scenario for us. We can add more complexity to
> > > > generalize
> > > > >> the
> > > > >> > > > > design
> > > > >> > > > > > if
> > > > >> > > > > > > > it
> > > > >> > > > > > > > > enables important use-case. Does this sound
> > > reasonable?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Note that the "Rejected Alternative" section also
> > > > mentions
> > > > >> > the
> > > > >> > > > > > > > possibility
> > > > >> > > > > > > > > of supporting a wider range of input systems by
> > > allowing
> > > > >> user
> > > > >> > > to
> > > > >> > > > > > > specify
> > > > >> > > > > > > > > the new-partition to old-partition mapping. We are
> > not
> > > > >> doing
> > > > >> > it
> > > > >> > > > > > because
> > > > >> > > > > > > > 1)
> > > > >> > > > > > > > > we may have better understanding of the design
> after
> > > we
> > > > >> have
> > > > >> > a
> > > > >> > > > > > specific
> > > > >> > > > > > > > > second input system to support 2) the current
> design
> > > can
> > > > >> be
> > > > >> > > > > extended
> > > > >> > > > > > to
> > > > >> > > > > > > > > support general input systems. I think similar
> > > argument
> > > > >> can
> > > > >> > be
> > > > >> > > > > > applied
> > > > >> > > > > > > > > explain why we don't have to support general input
> > > > systems
> > > > >> > > using
> > > > >> > > > > the
> > > > >> > > > > > > > > potentially-good alternatives you mentioned.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I hope SEP-5 can be an important first-step
> towards
> > > > >> > supporting
> > > > >> > > > > > > partition
> > > > >> > > > > > > > > expansion of any input system.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > To answer your questions about the current
> proposal:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > >1. "An alternative solution is to allow task
> number
> > > to
> > > > >> > > increase
> > > > >> > > > > > after
> > > > >> > > > > > > > > >partition expansion and uses a proper
> > > task-to-container
> > > > >> > > > assignment
> > > > >> > > > > > to
> > > > >> > > > > > > > make
> > > > >> > > > > > > > > >sure the Samza output is correct." What does the
> > > > >> container
> > > > >> > > have
> > > > >> > > > to
> > > > >> > > > > > do
> > > > >> > > > > > > > with
> > > > >> > > > > > > > > >stateful processing or output in general?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > The task-to-container assignment matters because
> if
> > > the
> > > > >> > > > correlated
> > > > >> > > > > > > tasks
> > > > >> > > > > > > > > (i.e. tasks that consume messages with the same
> key)
> > > > >> needs to
> > > > >> > > be
> > > > >> > > > in
> > > > >> > > > > > the
> > > > >> > > > > > > > > same container so that they can share the same
> > > key/value
> > > > >> > local
> > > > >> > > > > store
> > > > >> > > > > > on
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > same physical machine.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > >2. When you use "Join" as an example, you
> basically
> > > > mean
> > > > >> > > > multiple
> > > > >> > > > > > > > > >co-partitioned streams, right? This is opposed to
> > > > >> multiple,
> > > > >> > > > > > > > > >independently-partitioned streams or a single
> > stream.
> > > > >> Would
> > > > >> > be
> > > > >> > > > > nice
> > > > >> > > > > > to
> > > > >> > > > > > > > > >formulate the proposal in these more general
> terms.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I thought "join" is a commonly used to refer to
> the
> > > join
> > > > >> > > > opeartion
> > > > >> > > > > > with
> > > > >> > > > > > > > > co-partitioned stream but I may be wrong. I have
> > > updated
> > > > >> the
> > > > >> > > wiki
> > > > >> > > > > to
> > > > >> > > > > > > > > explicitly mention "co-partitioned stream". Does
> > this
> > > > look
> > > > >> > > better
> > > > >> > > > > > now?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > >3. When switching SSP groupers, how will the
> users
> > > > avoid
> > > > >> the
> > > > >> > > > > > > > > >org.apache.samza.checkpoint.kafka.
> > > > >> > > > DifferingSystemStreamPartition
> > > > >> > > > > > > > > GrouperFactoryValues
> > > > >> > > > > > > > > >exception?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I think we can hardcode new logic in
> > > > >> > > KafkaCheckpointLogKey.scala
> > > > >> > > > > such
> > > > >> > > > > > > > that
> > > > >> > > > > > > > > exception will not be thrown if new grouper is
> > > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper
> is
> > > > >> > > > > > GroupByPartition.
> > > > >> > > > > > > > Does
> > > > >> > > > > > > > > this look reasonable?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > >4. Partition to task assignment is meaningless
> > > without
> > > > >> key
> > > > >> > to
> > > > >> > > > > > > partition
> > > > >> > > > > > > > > >mapping. The real semantics are captured in the
> > > > external
> > > > >> > > > > requirement
> > > > >> > > > > > > for
> > > > >> > > > > > > > > >partitioning via hash+modulo. But in that case,
> > iiuc,
> > > > >> only
> > > > >> > the
> > > > >> > > > > > > partition
> > > > >> > > > > > > > > >count matters. So why not just store the original
> > > > >> partition
> > > > >> > > > count
> > > > >> > > > > > > rather
> > > > >> > > > > > > > > >than the whole mapping?
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > I think storing the previous task-to-partition
> > mapping
> > > > is
> > > > >> > more
> > > > >> > > > > > general
> > > > >> > > > > > > > than
> > > > >> > > > > > > > > storing the partition count of all topics for the
> > > > >> following
> > > > >> > > > > reasons:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > - Samza already stores the task-to-container
> mapping
> > > and
> > > > >> > > > > > > > container-to-host
> > > > >> > > > > > > > > mapping in the coordinator stream. It seems
> > consistent
> > > > to
> > > > >> > also
> > > > >> > > > > store
> > > > >> > > > > > > the
> > > > >> > > > > > > > > partition-to-task mapping. And this information
> may
> > be
> > > > >> useful
> > > > >> > > for
> > > > >> > > > > > other
> > > > >> > > > > > > > > use-case such as debugging.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > - By having the new interface take the previous
> > > > >> > > task-to-partition
> > > > >> > > > > > > > > assignment instead of a topic-to-partition-count
> > > mapping
> > > > >> as
> > > > >> > new
> > > > >> > > > > > > > parameter,
> > > > >> > > > > > > > > we can potentially have grouper implementation to
> > > > support
> > > > >> > other
> > > > >> > > > > types
> > > > >> > > > > > > of
> > > > >> > > > > > > > > input systems.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > - It is sightly simpler to store the
> > task-to-partition
> > > > >> > > assignment
> > > > >> > > > > > > because
> > > > >> > > > > > > > > we don't need to know whether this is the first
> > time a
> > > > >> job is
> > > > >> > > > > started
> > > > >> > > > > > > or
> > > > >> > > > > > > > > not. On the other hand, you can write
> > > > >> > topic-to-partition-count
> > > > >> > > > > > mapping
> > > > >> > > > > > > to
> > > > >> > > > > > > > > the coordinator stream only if this is the first
> > time
> > > > the
> > > > >> job
> > > > >> > > is
> > > > >> > > > > run
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > Thanks,
> > > > >> > > > > > > > > Dong
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <
> > > > >> > > > jacob.m...@gmail.com>
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > > Hey Dong,
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Thanks for the SEP. Supporting partition changes
> > is
> > > > >> > > critically
> > > > >> > > > > > > > important
> > > > >> > > > > > > > > > for stateful Samza jobs, so it's great to see
> some
> > > > >> ideas on
> > > > >> > > > that
> > > > >> > > > > > > front!
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Sorry for the late feedback, but I have a few
> > > thoughts
> > > > >> to
> > > > >> > > > > > contribute.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Big +1 on Navina's comment:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > My biggest gripe with this SEP is that it
> seems
> > > > like a
> > > > >> > > > > > tailor-made
> > > > >> > > > > > > > > > > solution
> > > > >> > > > > > > > > > > that relies on the semantics of the Kafka
> system
> > > and
> > > > >> yet,
> > > > >> > > we
> > > > >> > > > > are
> > > > >> > > > > > > > trying
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > masquerade that as operational requirements
> for
> > > > other
> > > > >> > > systems
> > > > >> > > > > > > > > interacting
> > > > >> > > > > > > > > > > with Samza. (Not to say that this is the first
> > > time
> > > > >> such
> > > > >> > a
> > > > >> > > > > choice
> > > > >> > > > > > > is
> > > > >> > > > > > > > > > being
> > > > >> > > > > > > > > > > made in the Samza design). I am not seeing how
> > > this
> > > > >> can a
> > > > >> > > > > > "general"
> > > > >> > > > > > > > > > > solution for all input systems. That's my two
> > > > cents. I
> > > > >> > > would
> > > > >> > > > > like
> > > > >> > > > > > > to
> > > > >> > > > > > > > > hear
> > > > >> > > > > > > > > > > alternative points of view for this from other
> > > devs.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Two examples of this:
> > > > >> > > > > > > > > > 1. This is mostly a hypothetical, but some
> message
> > > > >> brokers
> > > > >> > > may
> > > > >> > > > > use
> > > > >> > > > > > > key
> > > > >> > > > > > > > > > range assignment rather than hash+modulo.
> > > > >> > > > > > > > > > 2. Kafka can't reduce the number of partitions,
> > but
> > > it
> > > > >> can
> > > > >> > > > happen
> > > > >> > > > > > on
> > > > >> > > > > > > > > other
> > > > >> > > > > > > > > > systems. For example, it may be cheaper to
> reduce
> > > the
> > > > >> > number
> > > > >> > > of
> > > > >> > > > > > > > > partitions
> > > > >> > > > > > > > > > on a hosted service where the cost model depends
> > on
> > > > the
> > > > >> > > number
> > > > >> > > > of
> > > > >> > > > > > > > > > partitions/shards.
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > It seems to me that a solution which doesn't
> > depend
> > > on
> > > > >> > > > partition
> > > > >> > > > > > key
> > > > >> > > > > > > > > > assignment in the message broker. Here are a few
> > > > >> > alternatives
> > > > >> > > > > that
> > > > >> > > > > > > > > weren't
> > > > >> > > > > > > > > > discussed and I think should be considered:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Alternatives in order of increasing preference:
> > > > >> > > > > > > > > > 1. Samza manages the partition hash (via some
> new
> > > > >> contract
> > > > >> > > with
> > > > >> > > > > the
> > > > >> > > > > > > > > > brokers) and guarantees correct routing of keys
> > > among
> > > > >> the
> > > > >> > new
> > > > >> > > > > > > > partitions.
> > > > >> > > > > > > > > > 2. Samza detects a task count change, creates a
> > new
> > > > >> > changelog
> > > > >> > > > > with
> > > > >> > > > > > > > > correct
> > > > >> > > > > > > > > > partitions, and *somehow* reshuffles all
> existing
> > > > >> changelog
> > > > >> > > > data
> > > > >> > > > > > into
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > new topic and then uses the new topic from then
> > on.
> > > > >> > (doesn't
> > > > >> > > > work
> > > > >> > > > > > > > without
> > > > >> > > > > > > > > > changelog, but in that case durability isn't
> > > > paramount,
> > > > >> so
> > > > >> > we
> > > > >> > > > can
> > > > >> > > > > > > just
> > > > >> > > > > > > > > > wipe)
> > > > >> > > > > > > > > > 3. Use RPC in between stages and samza fully
> > manages
> > > > key
> > > > >> > > > > assignment
> > > > >> > > > > > > > among
> > > > >> > > > > > > > > > tasks. No on-disk topic data to clean up.
> > Mandatory
> > > > >> > > > > repartitioning
> > > > >> > > > > > in
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > first stage to pre-scaled tasks in next stage.
> > > > >> > > > > > > > > > 4. Combined 2-3 solution
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > Finally, some questions about the current
> > proposal:
> > > > >> > > > > > > > > > 1. "An alternative solution is to allow task
> > number
> > > to
> > > > >> > > increase
> > > > >> > > > > > after
> > > > >> > > > > > > > > > partition expansion and uses a proper
> > > > task-to-container
> > > > >> > > > > assignment
> > > > >> > > > > > to
> > > > >> > > > > > > > > make
> > > > >> > > > > > > > > > sure the Samza output is correct." What does the
> > > > >> container
> > > > >> > > have
> > > > >> > > > > to
> > > > >> > > > > > do
> > > > >> > > > > > > > > with
> > > > >> > > > > > > > > > stateful processing or output in general?
> > > > >> > > > > > > > > > 2. When you use "Join" as an example, you
> > basically
> > > > mean
> > > > >> > > > multiple
> > > > >> > > > > > > > > > co-partitioned streams, right? This is opposed
> to
> > > > >> multiple,
> > > > >> > > > > > > > > > independently-partitioned streams or a single
> > > stream.
> > > > >> Would
> > > > >> > > be
> > > > >> > > > > nice
> > > > >> > > > > > > to
> > > > >> > > > > > > > > > formulate the proposal in these more general
> > terms.
> > > > >> > > > > > > > > > 3. When switching SSP groupers, how will the
> users
> > > > avoid
> > > > >> > the
> > > > >> > > > > > > > > > org.apache.samza.checkpoint.kafka.
> > > > >> > DifferingSystemStreamParti
> > > > >> > > > > > > > > > tionGrouperFactoryValues
> > > > >> > > > > > > > > > exception?
> > > > >> > > > > > > > > > 4. Partition to task assignment is meaningless
> > > without
> > > > >> key
> > > > >> > to
> > > > >> > > > > > > partition
> > > > >> > > > > > > > > > mapping. The real semantics are captured in the
> > > > external
> > > > >> > > > > > requirement
> > > > >> > > > > > > > for
> > > > >> > > > > > > > > > partitioning via hash+modulo. But in that case,
> > > iiuc,
> > > > >> only
> > > > >> > > the
> > > > >> > > > > > > > partition
> > > > >> > > > > > > > > > count matters. So why not just store the
> original
> > > > >> partition
> > > > >> > > > count
> > > > >> > > > > > > > rather
> > > > >> > > > > > > > > > than the whole mapping?
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > -Jake
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <
> > > > >> > > lindon...@gmail.com
> > > > >> > > > >
> > > > >> > > > > > > wrote:
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > Hey Yi, Navina,
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > I have updated the SEP-5 document based on our
> > > > >> > discussion.
> > > > >> > > > The
> > > > >> > > > > > > > > difference
> > > > >> > > > > > > > > > > can be found here
> > > > >> > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/
> > > > >> > > > > > > diffpagesbyversion.action
> > > > >> > > > > > > > ?
> > > > >> > > > > > > > > > > pageId=70255476&selectedPageVersions=14&
> > > > >> > > selectedPageVersions
> > > > >> > > > > > =15>.
> > > > >> > > > > > > > > > > Here is the summary of changes:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > - Add new interface that extends the existing
> > > > >> interface
> > > > >> > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added
> > grouper
> > > > >> class
> > > > >> > > > should
> > > > >> > > > > > > > > implement
> > > > >> > > > > > > > > > > this interface.
> > > > >> > > > > > > > > > > - Explained in the Rejected Alternative
> Section
> > > why
> > > > we
> > > > >> > > don't
> > > > >> > > > > add
> > > > >> > > > > > > new
> > > > >> > > > > > > > > > method
> > > > >> > > > > > > > > > > in the existing interface
> > > > >> > > > > > > > > > > - Explained in the Rejected Alternative
> Section
> > > why
> > > > we
> > > > >> > > don't
> > > > >> > > > > > > > > config/class
> > > > >> > > > > > > > > > > for user to specify new-partition to
> > old-partition
> > > > >> > mapping.
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Can you take another look at the proposal and
> > let
> > > me
> > > > >> know
> > > > >> > > if
> > > > >> > > > > > there
> > > > >> > > > > > > is
> > > > >> > > > > > > > > any
> > > > >> > > > > > > > > > > concern?
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > Cheers,
> > > > >> > > > > > > > > > > Dong
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <
> > > > >> > > > lindon...@gmail.com
> > > > >> > > > > >
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > > Hey Yi,
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Thanks much for the comment. I have updated
> > the
> > > > doc
> > > > >> to
> > > > >> > > > > address
> > > > >> > > > > > > all
> > > > >> > > > > > > > > your
> > > > >> > > > > > > > > > > > comments except the one related to the
> > > interface.
> > > > I
> > > > >> am
> > > > >> > > not
> > > > >> > > > > > sure I
> > > > >> > > > > > > > > > > > understand your suggestion of the new
> > interface.
> > > > >> Will
> > > > >> > > > discuss
> > > > >> > > > > > > > > tomorrow.
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > Thanks,
> > > > >> > > > > > > > > > > > Dong
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <
> > > > >> > > > nickpa...@gmail.com
> > > > >> > > > > >
> > > > >> > > > > > > > wrote:
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >> Hi, Don,
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> Thanks for the detailed design doc for a
> > > > >> long-waited
> > > > >> > > > feature
> > > > >> > > > > > in
> > > > >> > > > > > > > > Samza!
> > > > >> > > > > > > > > > > >> Really appreciate it! I did a quick pass
> and
> > > have
> > > > >> the
> > > > >> > > > > > following
> > > > >> > > > > > > > > > > comments:
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> - minor: "limit the maximum size of
> > partition"
> > > > ==>
> > > > >> > > "limit
> > > > >> > > > > the
> > > > >> > > > > > > > > maximum
> > > > >> > > > > > > > > > > size
> > > > >> > > > > > > > > > > >> of each partition"
> > > > >> > > > > > > > > > > >> - "However, Samza currently is not able to
> > > handle
> > > > >> > > > partition
> > > > >> > > > > > > > > expansion
> > > > >> > > > > > > > > > of
> > > > >> > > > > > > > > > > >> the input streams"==>better point out "for
> > > > stateful
> > > > >> > > jobs".
> > > > >> > > > > For
> > > > >> > > > > > > > > > stateless
> > > > >> > > > > > > > > > > >> jobs, simply bouncing the job now can pick
> up
> > > the
> > > > >> new
> > > > >> > > > > > > partitions.
> > > > >> > > > > > > > > > > >> - "it is possible (e.g. with Kafka) that
> > > messages
> > > > >> > with a
> > > > >> > > > > given
> > > > >> > > > > > > key
> > > > >> > > > > > > > > > > exists
> > > > >> > > > > > > > > > > >> in both partition 1 an 3. Because
> > > > GroupByPartition
> > > > >> > will
> > > > >> > > > > assign
> > > > >> > > > > > > > > > > partition 1
> > > > >> > > > > > > > > > > >> and 3 to different tasks, messages with the
> > > same
> > > > >> key
> > > > >> > may
> > > > >> > > > be
> > > > >> > > > > > > > handled
> > > > >> > > > > > > > > by
> > > > >> > > > > > > > > > > >> different task/container/process and their
> > > state
> > > > >> will
> > > > >> > be
> > > > >> > > > > > stored
> > > > >> > > > > > > in
> > > > >> > > > > > > > > > > >> different changelog partition." The problem
> > > > >> statement
> > > > >> > is
> > > > >> > > > not
> > > > >> > > > > > > super
> > > > >> > > > > > > > > > clear
> > > > >> > > > > > > > > > > >> here. The issues with stateful jobs is:
> after
> > > > >> > > > > GroupByPartition
> > > > >> > > > > > > > > assign
> > > > >> > > > > > > > > > > >> partition 1 and 3 to different tasks, the
> new
> > > > task
> > > > >> > > > handling
> > > > >> > > > > > > > > partition
> > > > >> > > > > > > > > > 3
> > > > >> > > > > > > > > > > >> does not have the previous state to resume
> > the
> > > > >> work.
> > > > >> > > e.g.
> > > > >> > > > a
> > > > >> > > > > > > > page-key
> > > > >> > > > > > > > > > > based
> > > > >> > > > > > > > > > > >> counter would start from 0 in the new task
> > for
> > > a
> > > > >> > > specific
> > > > >> > > > > key,
> > > > >> > > > > > > > > instead
> > > > >> > > > > > > > > > > of
> > > > >> > > > > > > > > > > >> resuming the previous count 50 held by task
> > 1.
> > > > >> > > > > > > > > > > >> - minor rewording: "the first solution in
> > this
> > > > doc"
> > > > >> > ==>
> > > > >> > > > "the
> > > > >> > > > > > > > > solution
> > > > >> > > > > > > > > > > >> proposed in this doc"
> > > > >> > > > > > > > > > > >> - "Thus additional development work is
> needed
> > > in
> > > > >> Kafka
> > > > >> > > to
> > > > >> > > > > meet
> > > > >> > > > > > > > this
> > > > >> > > > > > > > > > > >> requirement" It would be good to link to a
> > KIP
> > > if
> > > > >> and
> > > > >> > > when
> > > > >> > > > > it
> > > > >> > > > > > > > exists
> > > > >> > > > > > > > > > > >> - Instead of touching/deprecating the
> > interface
> > > > >> > > > > > > > > > > >> SystemStreamPartitionGrouper, I would
> > recommend
> > > > to
> > > > >> > have
> > > > >> > > a
> > > > >> > > > > > > > different
> > > > >> > > > > > > > > > > >> implementation class of the interface,
> which
> > in
> > > > the
> > > > >> > > > > > constructor
> > > > >> > > > > > > of
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > >> grouper, takes two parameters: a) the
> > previous
> > > > task
> > > > >> > > number
> > > > >> > > > > > read
> > > > >> > > > > > > > from
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > > >> coordinator stream; b) the configured
> > > > >> new-partition to
> > > > >> > > > > > > > old-partition
> > > > >> > > > > > > > > > > >> mapping policy. Then, the grouper's
> interface
> > > > >> method
> > > > >> > > stays
> > > > >> > > > > the
> > > > >> > > > > > > > same
> > > > >> > > > > > > > > > and
> > > > >> > > > > > > > > > > >> the
> > > > >> > > > > > > > > > > >> behavior of the grouper is more
> configurable
> > > > which
> > > > >> is
> > > > >> > > good
> > > > >> > > > > to
> > > > >> > > > > > > > > support
> > > > >> > > > > > > > > > a
> > > > >> > > > > > > > > > > >> broader set of use cases in addition to
> > Kafka's
> > > > >> > built-in
> > > > >> > > > > > > partition
> > > > >> > > > > > > > > > > >> expansion policies.
> > > > >> > > > > > > > > > > >> - Minor renaming suggestion to the new
> > grouper
> > > > >> class
> > > > >> > > > names:
> > > > >> > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum
> > > > >> > > > > > > > > > > >> and GroupBySystemStreamPartitionWi
> > > thFixedTaskNum
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> Thanks!
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> - Yi
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin
> <
> > > > >> > > > > > lindon...@gmail.com
> > > > >> > > > > > > >
> > > > >> > > > > > > > > > wrote:
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >> > Hey Navina,
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Thanks much for the comment. Please see
> my
> > > > >> response
> > > > >> > > > below.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Regarding your biggest gripe with the
> SEP,
> > I
> > > > >> > > personally
> > > > >> > > > > > think
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > >> > operational requirement proposed in the
> KIP
> > > are
> > > > >> > pretty
> > > > >> > > > > > general
> > > > >> > > > > > > > and
> > > > >> > > > > > > > > > > >> could be
> > > > >> > > > > > > > > > > >> > easily enforced by other systems. The
> > reason
> > > is
> > > > >> that
> > > > >> > > the
> > > > >> > > > > > > module
> > > > >> > > > > > > > > > > >> operation
> > > > >> > > > > > > > > > > >> > is pretty standard and the default option
> > > when
> > > > we
> > > > >> > > choose
> > > > >> > > > > > > > > partition.
> > > > >> > > > > > > > > > > And
> > > > >> > > > > > > > > > > >> > usually the underlying system allows user
> > to
> > > > >> select
> > > > >> > > > > > arbitrary
> > > > >> > > > > > > > > > > partition
> > > > >> > > > > > > > > > > >> > number if it supports partition
> expansion.
> > Do
> > > > you
> > > > >> > know
> > > > >> > > > any
> > > > >> > > > > > > > system
> > > > >> > > > > > > > > > that
> > > > >> > > > > > > > > > > >> does
> > > > >> > > > > > > > > > > >> > not meet these two requirement?
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Regarding your comment of the Motivation
> > > > >> section, I
> > > > >> > > > > renamed
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > first
> > > > >> > > > > > > > > > > >> > section as "Problem and Goal" and
> specified
> > > > that
> > > > >> > "*The
> > > > >> > > > > goal
> > > > >> > > > > > of
> > > > >> > > > > > > > > this
> > > > >> > > > > > > > > > > >> > proposal is to enable partition expansion
> > of
> > > > the
> > > > >> > input
> > > > >> > > > > > > > > streams*.". I
> > > > >> > > > > > > > > > > >> also
> > > > >> > > > > > > > > > > >> > put a sentence at the end of the
> Motivation
> > > > >> section
> > > > >> > > that
> > > > >> > > > > > "*The
> > > > >> > > > > > > > > > feature
> > > > >> > > > > > > > > > > >> of
> > > > >> > > > > > > > > > > >> > task expansion is out of the scope of
> this
> > > > >> proposal
> > > > >> > > and
> > > > >> > > > > will
> > > > >> > > > > > > be
> > > > >> > > > > > > > > > > >> addressed
> > > > >> > > > > > > > > > > >> > in a future SEP*". The second paragraph
> in
> > > the
> > > > >> > > > Motivation
> > > > >> > > > > > > > section
> > > > >> > > > > > > > > is
> > > > >> > > > > > > > > > > >> mainly
> > > > >> > > > > > > > > > > >> > used to explain the thinking process that
> > we
> > > > have
> > > > >> > gone
> > > > >> > > > > > > through,
> > > > >> > > > > > > > > what
> > > > >> > > > > > > > > > > >> other
> > > > >> > > > > > > > > > > >> > alternative we have considered, and we
> plan
> > > to
> > > > >> do in
> > > > >> > > > Samza
> > > > >> > > > > > in
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > nex
> > > > >> > > > > > > > > > > >> step.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > To answer your question why increasing
> the
> > > > >> partition
> > > > >> > > > > number
> > > > >> > > > > > > will
> > > > >> > > > > > > > > > > >> increase
> > > > >> > > > > > > > > > > >> > the throughput of the kafka consumer in
> the
> > > > >> > container,
> > > > >> > > > > Kafka
> > > > >> > > > > > > > > > consumer
> > > > >> > > > > > > > > > > >> can
> > > > >> > > > > > > > > > > >> > potentially fetch more data in one
> > > > FetchResponse
> > > > >> > with
> > > > >> > > > more
> > > > >> > > > > > > > > > partitions
> > > > >> > > > > > > > > > > in
> > > > >> > > > > > > > > > > >> > the FetchRequest. This is because we
> limit
> > > the
> > > > >> > maximum
> > > > >> > > > > > amount
> > > > >> > > > > > > of
> > > > >> > > > > > > > > > data
> > > > >> > > > > > > > > > > >> that
> > > > >> > > > > > > > > > > >> > can be fetch for a given partition in the
> > > > >> > > FetchResponse.
> > > > >> > > > > > This
> > > > >> > > > > > > by
> > > > >> > > > > > > > > > > >> default is
> > > > >> > > > > > > > > > > >> > set to 1 MB. And there is reason that we
> > can
> > > > not
> > > > >> > > > > arbitrarily
> > > > >> > > > > > > > bump
> > > > >> > > > > > > > > up
> > > > >> > > > > > > > > > > >> this
> > > > >> > > > > > > > > > > >> > limit.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > To answer your question how partition
> > > expansion
> > > > >> in
> > > > >> > > Kafka
> > > > >> > > > > > > impacts
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > >> > clients, Kafka consumer is able to
> > > > automatically
> > > > >> > > detect
> > > > >> > > > > new
> > > > >> > > > > > > > > > partition
> > > > >> > > > > > > > > > > of
> > > > >> > > > > > > > > > > >> > the topic and reassign all (both old and
> > new)
> > > > >> > > partitions
> > > > >> > > > > > > across
> > > > >> > > > > > > > > > > >> consumers
> > > > >> > > > > > > > > > > >> > in the consumer group IF you tell
> consumer
> > > the
> > > > >> topic
> > > > >> > > to
> > > > >> > > > be
> > > > >> > > > > > > > > > subscribed.
> > > > >> > > > > > > > > > > >> But
> > > > >> > > > > > > > > > > >> > consumer in Samza's container uses
> another
> > > way
> > > > of
> > > > >> > > > > > > subscription.
> > > > >> > > > > > > > > > > Instead
> > > > >> > > > > > > > > > > >> of
> > > > >> > > > > > > > > > > >> > subscribing to the topic, the consumer in
> > > > Samza's
> > > > >> > > > > container
> > > > >> > > > > > > > > > subscribes
> > > > >> > > > > > > > > > > >> to
> > > > >> > > > > > > > > > > >> > the specific partitions of the topic. In
> > this
> > > > >> case,
> > > > >> > if
> > > > >> > > > new
> > > > >> > > > > > > > > > partitions
> > > > >> > > > > > > > > > > >> have
> > > > >> > > > > > > > > > > >> > been added, Samza will need to explicitly
> > > > >> subscribe
> > > > >> > to
> > > > >> > > > the
> > > > >> > > > > > new
> > > > >> > > > > > > > > > > >> partitions
> > > > >> > > > > > > > > > > >> > of the topic. The "Handle partition
> > expansion
> > > > >> while
> > > > >> > > > tasks
> > > > >> > > > > > are
> > > > >> > > > > > > > > > running"
> > > > >> > > > > > > > > > > >> > section in the SEP addresses this issue
> in
> > > > Samza
> > > > >> --
> > > > >> > it
> > > > >> > > > > > > > > recalculates
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > >> job
> > > > >> > > > > > > > > > > >> > model and restart container so that
> > consumer
> > > > can
> > > > >> > > > subscribe
> > > > >> > > > > > to
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > new
> > > > >> > > > > > > > > > > >> > partitions.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > I will ask other dev to take a look at
> the
> > > > >> > proposal. I
> > > > >> > > > > will
> > > > >> > > > > > > > start
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > voting thread tomorrow if there is no
> > further
> > > > >> > concern
> > > > >> > > > with
> > > > >> > > > > > the
> > > > >> > > > > > > > > SEP.
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > Thanks!
> > > > >> > > > > > > > > > > >> > Dong
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina
> > > Ramesh
> > > > >> > > > (Apache) <
> > > > >> > > > > > > > > > > >> > nav...@apache.org>
> > > > >> > > > > > > > > > > >> > wrote:
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >> > > Hey Dong,
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > >  I have updated the motivation
> section
> > to
> > > > >> > clarify
> > > > >> > > > > this.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Thanks for updating the motivation.
> > Couple
> > > of
> > > > >> > notes
> > > > >> > > > > here:
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > 1.
> > > > >> > > > > > > > > > > >> > > > "The motivation of increasing
> partition
> > > > >> number
> > > > >> > of
> > > > >> > > > > Kafka
> > > > >> > > > > > > > topic
> > > > >> > > > > > > > > > > >> includes
> > > > >> > > > > > > > > > > >> > 1)
> > > > >> > > > > > > > > > > >> > > limit the maximum size of a partition
> in
> > > > order
> > > > >> to
> > > > >> > > > > improve
> > > > >> > > > > > > > broker
> > > > >> > > > > > > > > > > >> > > performance and 2) increase throughput
> of
> > > > Kafka
> > > > >> > > > consumer
> > > > >> > > > > > in
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > Samza
> > > > >> > > > > > > > > > > >> > > container."
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > It's unclear to me how increasing the
> > > > partition
> > > > >> > > number
> > > > >> > > > > > will
> > > > >> > > > > > > > > > increase
> > > > >> > > > > > > > > > > >> the
> > > > >> > > > > > > > > > > >> > > throughput of the kafka consumer in the
> > > > >> container?
> > > > >> > > > > > > > > Theoretically,
> > > > >> > > > > > > > > > > you
> > > > >> > > > > > > > > > > >> > will
> > > > >> > > > > > > > > > > >> > > still be consuming the same amount of
> > data
> > > in
> > > > >> the
> > > > >> > > > > > container,
> > > > >> > > > > > > > > > > >> irrespective
> > > > >> > > > > > > > > > > >> > > of whether it is coming from one
> > partition
> > > or
> > > > >> more
> > > > >> > > > than
> > > > >> > > > > > one
> > > > >> > > > > > > > > > expanded
> > > > >> > > > > > > > > > > >> > > partitions. Can you please explain it
> for
> > > me
> > > > >> here,
> > > > >> > > > what
> > > > >> > > > > > you
> > > > >> > > > > > > > mean
> > > > >> > > > > > > > > > by
> > > > >> > > > > > > > > > > >> that?
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > 2. I believe the second paragraph under
> > > > >> motivation
> > > > >> > > is
> > > > >> > > > > > simply
> > > > >> > > > > > > > > > talking
> > > > >> > > > > > > > > > > >> > about
> > > > >> > > > > > > > > > > >> > > the scope of the current SEP. It will
> be
> > > > >> easier to
> > > > >> > > > read
> > > > >> > > > > if
> > > > >> > > > > > > > what
> > > > >> > > > > > > > > > > >> solution
> > > > >> > > > > > > > > > > >> > is
> > > > >> > > > > > > > > > > >> > > included in this SEP and what is left
> out
> > > as
> > > > >> not
> > > > >> > in
> > > > >> > > > > scope.
> > > > >> > > > > > > > (for
> > > > >> > > > > > > > > > > >> example,
> > > > >> > > > > > > > > > > >> > > expansions for stateful jobs is
> supported
> > > or
> > > > >> not).
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > We need to persist the
> task-to-sspList
> > > > >> mapping
> > > > >> > in
> > > > >> > > > the
> > > > >> > > > > > > > > > > >> > > coordinator stream so that the job can
> > > derive
> > > > >> the
> > > > >> > > > > original
> > > > >> > > > > > > > > number
> > > > >> > > > > > > > > > of
> > > > >> > > > > > > > > > > >> > > partitions of each input stream
> > regardless
> > > of
> > > > >> how
> > > > >> > > many
> > > > >> > > > > > times
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > >> > partition
> > > > >> > > > > > > > > > > >> > > has expanded. Does this make sense?
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Yes. It does!
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > I am not sure how this is related to
> > the
> > > > >> > locality
> > > > >> > > > > > though.
> > > > >> > > > > > > > Can
> > > > >> > > > > > > > > > you
> > > > >> > > > > > > > > > > >> > clarify
> > > > >> > > > > > > > > > > >> > > your question if I haven't answered
> your
> > > > >> question?
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > It's not related. I just meant to give
> an
> > > > >> example
> > > > >> > of
> > > > >> > > > yet
> > > > >> > > > > > > > another
> > > > >> > > > > > > > > > > >> > > coordinator message that is persisted.
> > Your
> > > > >> > > > ssp-to-task
> > > > >> > > > > > > > mapping
> > > > >> > > > > > > > > is
> > > > >> > > > > > > > > > > >> > > following a similar pattern for
> > persisting.
> > > > >> Just
> > > > >> > > > wanted
> > > > >> > > > > to
> > > > >> > > > > > > > > clarify
> > > > >> > > > > > > > > > > >> that.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > Can you let me know if this, together
> > > with
> > > > >> the
> > > > >> > > > answers
> > > > >> > > > > > in
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > >> previous
> > > > >> > > > > > > > > > > >> > > email, addresses all your questions?
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Yes. I believe you have addressed most
> of
> > > my
> > > > >> > > > questions.
> > > > >> > > > > > > Thanks
> > > > >> > > > > > > > > for
> > > > >> > > > > > > > > > > >> taking
> > > > >> > > > > > > > > > > >> > > time to do that.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > Is there specific question you have
> > > > regarding
> > > > >> > > > > partition
> > > > >> > > > > > > > > > > >> > > expansion in Kafka?
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > I guess my questions are on how
> partition
> > > > >> > expansion
> > > > >> > > in
> > > > >> > > > > > Kafka
> > > > >> > > > > > > > > > impacts
> > > > >> > > > > > > > > > > >> the
> > > > >> > > > > > > > > > > >> > > clients. Iiuc, partition expansions are
> > > done
> > > > >> > > manually
> > > > >> > > > in
> > > > >> > > > > > > Kafka
> > > > >> > > > > > > > > > based
> > > > >> > > > > > > > > > > >> on
> > > > >> > > > > > > > > > > >> > the
> > > > >> > > > > > > > > > > >> > > bytes-in rate of the partition. Do the
> > > > existing
> > > > >> > > kafka
> > > > >> > > > > > > clients
> > > > >> > > > > > > > > > handle
> > > > >> > > > > > > > > > > >> this
> > > > >> > > > > > > > > > > >> > > expansion automatically? if yes, how
> does
> > > it
> > > > >> work?
> > > > >> > > If
> > > > >> > > > > not,
> > > > >> > > > > > > are
> > > > >> > > > > > > > > > there
> > > > >> > > > > > > > > > > >> > plans
> > > > >> > > > > > > > > > > >> > > to support it in the future?
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > Thus user's job should not need to
> > > > bootstrap
> > > > >> > > > key/value
> > > > >> > > > > > > store
> > > > >> > > > > > > > > > from
> > > > >> > > > > > > > > > > >> the
> > > > >> > > > > > > > > > > >> > > changelog topic.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Why is this discussion relevant here?
> > > > Key/value
> > > > >> > > store
> > > > >> > > > /
> > > > >> > > > > > > > > changelog
> > > > >> > > > > > > > > > > >> topic
> > > > >> > > > > > > > > > > >> > > partition is scoped with the context
> of a
> > > > task.
> > > > >> > > Since
> > > > >> > > > we
> > > > >> > > > > > are
> > > > >> > > > > > > > not
> > > > >> > > > > > > > > > > >> changing
> > > > >> > > > > > > > > > > >> > > the number of tasks, I don't think it
> is
> > > > >> required
> > > > >> > to
> > > > >> > > > > > mention
> > > > >> > > > > > > > it
> > > > >> > > > > > > > > > > here.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > The new method takes the
> > > > >> > > > SystemStreamPartition-to-Task
> > > > >> > > > > > > > > > assignment
> > > > >> > > > > > > > > > > >> from
> > > > >> > > > > > > > > > > >> > > the previous job model which can be
> read
> > > from
> > > > >> the
> > > > >> > > > > > > coordinator
> > > > >> > > > > > > > > > > stream.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Jobmodel is currently not persisted to
> > > > >> coordinator
> > > > >> > > > > stream.
> > > > >> > > > > > > In
> > > > >> > > > > > > > > your
> > > > >> > > > > > > > > > > >> > design,
> > > > >> > > > > > > > > > > >> > > you talk about writing separate
> > coordinator
> > > > >> > messages
> > > > >> > > > for
> > > > >> > > > > > > > > > ssp-to-task
> > > > >> > > > > > > > > > > >> > > assignments. Hence, please correct this
> > > > >> statement.
> > > > >> > > It
> > > > >> > > > is
> > > > >> > > > > > > kind
> > > > >> > > > > > > > of
> > > > >> > > > > > > > > > > >> > misleading
> > > > >> > > > > > > > > > > >> > > to the reader.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > My biggest gripe with this SEP is that
> it
> > > > seems
> > > > >> > > like a
> > > > >> > > > > > > > > tailor-made
> > > > >> > > > > > > > > > > >> > solution
> > > > >> > > > > > > > > > > >> > > that relies on the semantics of the
> Kafka
> > > > >> system
> > > > >> > and
> > > > >> > > > > yet,
> > > > >> > > > > > we
> > > > >> > > > > > > > are
> > > > >> > > > > > > > > > > >> trying
> > > > >> > > > > > > > > > > >> > to
> > > > >> > > > > > > > > > > >> > > masquerade that as operational
> > requirements
> > > > for
> > > > >> > > other
> > > > >> > > > > > > systems
> > > > >> > > > > > > > > > > >> interacting
> > > > >> > > > > > > > > > > >> > > with Samza. (Not to say that this is
> the
> > > > first
> > > > >> > time
> > > > >> > > > > such a
> > > > >> > > > > > > > > choice
> > > > >> > > > > > > > > > is
> > > > >> > > > > > > > > > > >> > being
> > > > >> > > > > > > > > > > >> > > made in the Samza design). I am not
> > seeing
> > > > how
> > > > >> > this
> > > > >> > > > can
> > > > >> > > > > a
> > > > >> > > > > > > > > > "general"
> > > > >> > > > > > > > > > > >> > > solution for all input systems. That's
> my
> > > two
> > > > >> > > cents. I
> > > > >> > > > > > would
> > > > >> > > > > > > > > like
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > >> hear
> > > > >> > > > > > > > > > > >> > > alternative points of view for this
> from
> > > > other
> > > > >> > devs.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Please make sure you have enough eyes
> on
> > > this
> > > > >> SEP.
> > > > >> > > If
> > > > >> > > > > you
> > > > >> > > > > > > do,
> > > > >> > > > > > > > > > please
> > > > >> > > > > > > > > > > >> > start
> > > > >> > > > > > > > > > > >> > > a VOTE thread to approve this SEP.
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > Thanks!
> > > > >> > > > > > > > > > > >> > > Navina
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong
> > Lin
> > > <
> > > > >> > > > > > > > lindon...@gmail.com
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > > > > >> wrote:
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> > > > Hey Navina,
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > I have updated the wiki based on your
> > > > >> > suggestion.
> > > > >> > > > More
> > > > >> > > > > > > > > > > >> specifically, I
> > > > >> > > > > > > > > > > >> > > have
> > > > >> > > > > > > > > > > >> > > > made the following changes:
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > - Improved Problem section and
> > Motivation
> > > > >> > section
> > > > >> > > to
> > > > >> > > > > > > > describe
> > > > >> > > > > > > > > > why
> > > > >> > > > > > > > > > > we
> > > > >> > > > > > > > > > > >> > use
> > > > >> > > > > > > > > > > >> > > > the solution in this proposal instead
> > of
> > > > >> > tackling
> > > > >> > > > the
> > > > >> > > > > > > > problem
> > > > >> > > > > > > > > of
> > > > >> > > > > > > > > > > >> task
> > > > >> > > > > > > > > > > >> > > > expansion directly.
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > - Illustrate the design in a way that
> > > > doesn't
> > > > >> > bind
> > > > >> > > > to
> > > > >> > > > > > > Kafka.
> > > > >> > > > > > > > > > Kafka
> > > > >> > > > > > > > > > > >> is
> > > > >> > > > > > > > > > > >> > > only
> > > > >> > > > > > > > > > > >> > > > used as example to illustrate why we
> > want
> > > > to
> > > > >> > > expand
> > > > >> > > > > > > > partition
> > > > >> > > > > > > > > > > >> expansion
> > > > >> > > > > > > > > > > >> > > and
> > > > >> > > > > > > > > > > >> > > > whether the operational requirement
> can
> > > be
> > > > >> > > supported
> > > > >> > > > > > when
> > > > >> > > > > > > > > Kafka
> > > > >> > > > > > > > > > is
> > > > >> > > > > > > > > > > >> used
> > > > >> > > > > > > > > > > >> > > as
> > > > >> > > > > > > > > > > >> > > > the input system. Note that the
> > proposed
> > > > >> > solution
> > > > >> > > > > should
> > > > >> > > > > > > > work
> > > > >> > > > > > > > > > for
> > > > >> > > > > > > > > > > >> any
> > > > >> > > > > > > > > > > >> > > input
> > > > >> > > > > > > > > > > >> > > > system that meets the operational
> > > > requirement
> > > > >> > > > > described
> > > > >> > > > > > in
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > wiki.
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > - Fixed the problem in the figure.
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > - Added a new class
> > > > >> > GroupBySystemStreamPartitionFi
> > > > >> > > > > > > > xedTaskNum
> > > > >> > > > > > > > > to
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > wiki.
> > > > >> > > > > > > > > > > >> > > > Together with
> > > GroupByPartitionFixedTaskNum,
> > > > >> it
> > > > >> > > > should
> > > > >> > > > > > > ensure
> > > > >> > > > > > > > > > that
> > > > >> > > > > > > > > > > we
> > > > >> > > > > > > > > > > >> > > have a
> > > > >> > > > > > > > > > > >> > > > solution to enable partition
> expansion
> > > for
> > > > >> all
> > > > >> > > users
> > > > >> > > > > > that
> > > > >> > > > > > > > are
> > > > >> > > > > > > > > > > using
> > > > >> > > > > > > > > > > >> > > > pre-defined grouper in Samza. Note
> that
> > > > those
> > > > >> > > users
> > > > >> > > > > who
> > > > >> > > > > > > use
> > > > >> > > > > > > > > > custom
> > > > >> > > > > > > > > > > >> > > grouper
> > > > >> > > > > > > > > > > >> > > > would need to update their
> > > implementation.
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > Can you let me know if this, together
> > > with
> > > > >> the
> > > > >> > > > answers
> > > > >> > > > > > in
> > > > >> > > > > > > > the
> > > > >> > > > > > > > > > > >> previous
> > > > >> > > > > > > > > > > >> > > > email, addresses all your questions?
> > > Thanks
> > > > >> for
> > > > >> > > > taking
> > > > >> > > > > > > time
> > > > >> > > > > > > > to
> > > > >> > > > > > > > > > > >> review
> > > > >> > > > > > > > > > > >> > the
> > > > >> > > > > > > > > > > >> > > > proposal.
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > Regards,
> > > > >> > > > > > > > > > > >> > > > Dong
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM,
> Dong
> > > Lin
> > > > <
> > > > >> > > > > > > > > lindon...@gmail.com
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > > > >> > wrote:
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > > > > Hey Navina,
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > Thanks much for your comments.
> Please
> > > see
> > > > >> my
> > > > >> > > reply
> > > > >> > > > > > > inline.
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM,
> > > Navina
> > > > >> > Ramesh
> > > > >> > > > > > > (Apache) <
> > > > >> > > > > > > > > > > >> > > > > nav...@apache.org> wrote:
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a
> > > > couple
> > > > >> of
> > > > >> > > > > > questions
> > > > >> > > > > > > to
> > > > >> > > > > > > > > > > >> understand
> > > > >> > > > > > > > > > > >> > > > your
> > > > >> > > > > > > > > > > >> > > > >> proposal better:
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> * Under motivation, you mention
> that
> > > > "_We
> > > > >> > > expect
> > > > >> > > > > this
> > > > >> > > > > > > > > > solution
> > > > >> > > > > > > > > > > to
> > > > >> > > > > > > > > > > >> > work
> > > > >> > > > > > > > > > > >> > > > >> similarly with other input system
> as
> > > > >> well._",
> > > > >> > > > yet I
> > > > >> > > > > > > don't
> > > > >> > > > > > > > > see
> > > > >> > > > > > > > > > > any
> > > > >> > > > > > > > > > > >> > > > >> discussion on how it will work
> with
> > > > other
> > > > >> > input
> > > > >> > > > > > > systems.
> > > > >> > > > > > > > > That
> > > > >> > > > > > > > > > > is,
> > > > >> > > > > > > > > > > >> > what
> > > > >> > > > > > > > > > > >> > > > >> kind
> > > > >> > > > > > > > > > > >> > > > >> of contract does samza expect from
> > > other
> > > > >> > input
> > > > >> > > > > > systems
> > > > >> > > > > > > ?
> > > > >> > > > > > > > If
> > > > >> > > > > > > > > > we
> > > > >> > > > > > > > > > > >> are
> > > > >> > > > > > > > > > > >> > not
> > > > >> > > > > > > > > > > >> > > > >> planning to provide a generic
> > > solution,
> > > > it
> > > > >> > > might
> > > > >> > > > be
> > > > >> > > > > > > worth
> > > > >> > > > > > > > > > > >> calling it
> > > > >> > > > > > > > > > > >> > > out
> > > > >> > > > > > > > > > > >> > > > >> in
> > > > >> > > > > > > > > > > >> > > > >> the SEP.
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > I think the contract we expect from
> > > other
> > > > >> > > systems
> > > > >> > > > > are
> > > > >> > > > > > > > > exactly
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > > > operational requirement mentioned
> in
> > > the
> > > > >> SEP,
> > > > >> > > i.e.
> > > > >> > > > > > > > > partitions
> > > > >> > > > > > > > > > > >> should
> > > > >> > > > > > > > > > > >> > > > always
> > > > >> > > > > > > > > > > >> > > > > be doubled and the hash algorithm
> > > should
> > > > >> > module
> > > > >> > > > the
> > > > >> > > > > > > number
> > > > >> > > > > > > > > of
> > > > >> > > > > > > > > > > >> > > partitions.
> > > > >> > > > > > > > > > > >> > > > > SEP-5 should also allow partition
> > > > >> expansion of
> > > > >> > > all
> > > > >> > > > > > input
> > > > >> > > > > > > > > > systems
> > > > >> > > > > > > > > > > >> that
> > > > >> > > > > > > > > > > >> > > > meet
> > > > >> > > > > > > > > > > >> > > > > these two requirements. I have
> > updated
> > > > the
> > > > >> > > > > motivation
> > > > >> > > > > > > > > section
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > >> > > clarify
> > > > >> > > > > > > > > > > >> > > > > this.
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> * I understand the partition
> mapping
> > > > logic
> > > > >> > you
> > > > >> > > > have
> > > > >> > > > > > > > > proposed.
> > > > >> > > > > > > > > > > >> But I
> > > > >> > > > > > > > > > > >> > > > think
> > > > >> > > > > > > > > > > >> > > > >> the example explanation doesn't
> > match
> > > > the
> > > > >> > > > diagram.
> > > > >> > > > > In
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > > >> diagram,
> > > > >> > > > > > > > > > > >> > > after
> > > > >> > > > > > > > > > > >> > > > >> expansion, partiion-0 and
> > partition-1
> > > > are
> > > > >> > > > pointing
> > > > >> > > > > to
> > > > >> > > > > > > > > bucket
> > > > >> > > > > > > > > > 0
> > > > >> > > > > > > > > > > >> and
> > > > >> > > > > > > > > > > >> > > > >> partition-3 and partition-4 are
> > > pointing
> > > > >> to
> > > > >> > > > bucket
> > > > >> > > > > > 1. I
> > > > >> > > > > > > > > think
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > former
> > > > >> > > > > > > > > > > >> > > > >> has to be partition-0 and
> > partition-2
> > > > and
> > > > >> the
> > > > >> > > > > latter,
> > > > >> > > > > > > is
> > > > >> > > > > > > > > > > >> partition-1
> > > > >> > > > > > > > > > > >> > > and
> > > > >> > > > > > > > > > > >> > > > >> partition-3. If I am wrong, please
> > > help
> > > > me
> > > > >> > > > > understand
> > > > >> > > > > > > the
> > > > >> > > > > > > > > > logic
> > > > >> > > > > > > > > > > >> :)
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > Good catch. I will update the
> figure
> > to
> > > > fix
> > > > >> > this
> > > > >> > > > > > > problem.
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> * I don't know how partition
> > expansion
> > > > in
> > > > >> > Kafka
> > > > >> > > > > > works.
> > > > >> > > > > > > I
> > > > >> > > > > > > > am
> > > > >> > > > > > > > > > > >> familiar
> > > > >> > > > > > > > > > > >> > > > with
> > > > >> > > > > > > > > > > >> > > > >> how shard splitting happens in
> > > Kinesis -
> > > > >> > there
> > > > >> > > is
> > > > >> > > > > > > > > > hierarchical
> > > > >> > > > > > > > > > > >> > > relation
> > > > >> > > > > > > > > > > >> > > > >> between the parent and child
> shards.
> > > > This
> > > > >> > way,
> > > > >> > > it
> > > > >> > > > > > will
> > > > >> > > > > > > > also
> > > > >> > > > > > > > > > > allow
> > > > >> > > > > > > > > > > >> > the
> > > > >> > > > > > > > > > > >> > > > >> shards to be merged back. Iiuc,
> > Kafka
> > > > only
> > > > >> > > > supports
> > > > >> > > > > > > > > partition
> > > > >> > > > > > > > > > > >> > > > "expansion",
> > > > >> > > > > > > > > > > >> > > > >> as opposed to "splits". Can you
> > > provide
> > > > >> some
> > > > >> > > > > context
> > > > >> > > > > > or
> > > > >> > > > > > > > > link
> > > > >> > > > > > > > > > > >> related
> > > > >> > > > > > > > > > > >> > > to
> > > > >> > > > > > > > > > > >> > > > >> how
> > > > >> > > > > > > > > > > >> > > > >> partition expansion works in
> Kafka?
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > I couldn't find any wiki on
> partition
> > > > >> > expansion
> > > > >> > > in
> > > > >> > > > > > > Kafka.
> > > > >> > > > > > > > > The
> > > > >> > > > > > > > > > > >> > partition
> > > > >> > > > > > > > > > > >> > > > > expansion logic in Kafka is very
> > simply
> > > > --
> > > > >> it
> > > > >> > > > simply
> > > > >> > > > > > > adds
> > > > >> > > > > > > > > new
> > > > >> > > > > > > > > > > >> > partition
> > > > >> > > > > > > > > > > >> > > > to
> > > > >> > > > > > > > > > > >> > > > > the existing topic. Is there
> specific
> > > > >> question
> > > > >> > > you
> > > > >> > > > > > have
> > > > >> > > > > > > > > > > regarding
> > > > >> > > > > > > > > > > >> > > > partition
> > > > >> > > > > > > > > > > >> > > > > expansion in Kafka?
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> * Are you only recommending that
> > > > expansion
> > > > >> > can
> > > > >> > > be
> > > > >> > > > > > > > supported
> > > > >> > > > > > > > > > for
> > > > >> > > > > > > > > > > >> > samza
> > > > >> > > > > > > > > > > >> > > > jobs
> > > > >> > > > > > > > > > > >> > > > >> that use Kafka as input systems
> > > **and**
> > > > >> > > configure
> > > > >> > > > > the
> > > > >> > > > > > > > > > > SSPGrouper
> > > > >> > > > > > > > > > > >> as
> > > > >> > > > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum?
> Sounds
> > > to
> > > > me
> > > > >> > like
> > > > >> > > > > this
> > > > >> > > > > > > only
> > > > >> > > > > > > > > > > applies
> > > > >> > > > > > > > > > > >> > for
> > > > >> > > > > > > > > > > >> > > > >> GroupByPartition. Please correct
> me
> > > if I
> > > > >> am
> > > > >> > > > wrong.
> > > > >> > > > > > What
> > > > >> > > > > > > > is
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > > expectation
> > > > >> > > > > > > > > > > >> > > > >> for custom SSP Groupers?
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > The expansion can be supported for
> > > Samza
> > > > >> jobs
> > > > >> > if
> > > > >> > > > the
> > > > >> > > > > > > input
> > > > >> > > > > > > > > > > system
> > > > >> > > > > > > > > > > >> > meets
> > > > >> > > > > > > > > > > >> > > > > the operational requirement
> mentioned
> > > > >> above.
> > > > >> > It
> > > > >> > > > > > doesn't
> > > > >> > > > > > > > have
> > > > >> > > > > > > > > > to
> > > > >> > > > > > > > > > > >> use
> > > > >> > > > > > > > > > > >> > > Kafka
> > > > >> > > > > > > > > > > >> > > > > as input system.
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > The current proposal provided
> > solution
> > > > for
> > > > >> > jobs
> > > > >> > > > that
> > > > >> > > > > > > > > currently
> > > > >> > > > > > > > > > > use
> > > > >> > > > > > > > > > > >> > > > > GroupByPartition. The proposal can
> be
> > > > >> extended
> > > > >> > > to
> > > > >> > > > > > > support
> > > > >> > > > > > > > > jobs
> > > > >> > > > > > > > > > > >> that
> > > > >> > > > > > > > > > > >> > use
> > > > >> > > > > > > > > > > >> > > > > other grouper that are pre-defined
> in
> > > > >> Samza.
> > > > >> > The
> > > > >> > > > > > custom
> > > > >> > > > > > > > SSP
> > > > >> > > > > > > > > > > >> grouper
> > > > >> > > > > > > > > > > >> > > needs
> > > > >> > > > > > > > > > > >> > > > > to handle partition expansion
> similar
> > > to
> > > > >> how
> > > > >> > > > > > > > > > > >> > > GroupByPartitionFixedTaskNum
> > > > >> > > > > > > > > > > >> > > > > handles it and it is users'
> > > > responsibility
> > > > >> to
> > > > >> > > > update
> > > > >> > > > > > > their
> > > > >> > > > > > > > > > > custom
> > > > >> > > > > > > > > > > >> > > grouper
> > > > >> > > > > > > > > > > >> > > > > implementation.
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task
> > > > >> assignment to
> > > > >> > > > > > > coordinator
> > > > >> > > > > > > > > > > stream:
> > > > >> > > > > > > > > > > >> > > Today,
> > > > >> > > > > > > > > > > >> > > > >> the JobModel encapsulates the data
> > > model
> > > > >> in
> > > > >> > > samza
> > > > >> > > > > > which
> > > > >> > > > > > > > > also
> > > > >> > > > > > > > > > > >> > includes
> > > > >> > > > > > > > > > > >> > > > >> **TaskModels**. TaskModel,
> typically
> > > > shows
> > > > >> > the
> > > > >> > > > > > > > > > task-to-sspList
> > > > >> > > > > > > > > > > >> > > mapping.
> > > > >> > > > > > > > > > > >> > > > >> What is the reason for using a
> > > separate
> > > > >> > > > coordinator
> > > > >> > > > > > > > stream
> > > > >> > > > > > > > > > > >> message
> > > > >> > > > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because
> > the
> > > > >> > JobModel
> > > > >> > > > > > itself
> > > > >> > > > > > > is
> > > > >> > > > > > > > > not
> > > > >> > > > > > > > > > > >> > > persisted
> > > > >> > > > > > > > > > > >> > > > in
> > > > >> > > > > > > > > > > >> > > > >> the coordinator stream today?  The
> > > > reason
> > > > >> > > > locality
> > > > >> > > > > > > exists
> > > > >> > > > > > > > > > > >> outside of
> > > > >> > > > > > > > > > > >> > > the
> > > > >> > > > > > > > > > > >> > > > >> jobmodel is because *locality*
> > > > >> information is
> > > > >> > > > > written
> > > > >> > > > > > > by
> > > > >> > > > > > > > > each
> > > > >> > > > > > > > > > > >> > > container,
> > > > >> > > > > > > > > > > >> > > > >> where as it is consumed only by
> the
> > > > leader
> > > > >> > > > > > > > > jobcoordinator/AM.
> > > > >> > > > > > > > > > > In
> > > > >> > > > > > > > > > > >> > this
> > > > >> > > > > > > > > > > >> > > > >> case,
> > > > >> > > > > > > > > > > >> > > > >> the writer of the mapping
> > information
> > > > and
> > > > >> the
> > > > >> > > > > reader
> > > > >> > > > > > is
> > > > >> > > > > > > > > still
> > > > >> > > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > leader
> > > > >> > > > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to
> > > > >> understand
> > > > >> > the
> > > > >> > > > > > > > motivation
> > > > >> > > > > > > > > > for
> > > > >> > > > > > > > > > > >> this
> > > > >> > > > > > > > > > > >> > > > >> choice.
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > Yes, the reason for using a
> separate
> > > > >> > coordinate
> > > > >> > > > > stream
> > > > >> > > > > > > > > message
> > > > >> > > > > > > > > > > is
> > > > >> > > > > > > > > > > >> > > because
> > > > >> > > > > > > > > > > >> > > > > the task-to-sspList mapping is not
> > > > >> currently
> > > > >> > > > > persisted
> > > > >> > > > > > > in
> > > > >> > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > coordinator
> > > > >> > > > > > > > > > > >> > > > > stream. We wouldn't need to create
> > this
> > > > new
> > > > >> > > stream
> > > > >> > > > > > > message
> > > > >> > > > > > > > > if
> > > > >> > > > > > > > > > > >> > JobModel
> > > > >> > > > > > > > > > > >> > > is
> > > > >> > > > > > > > > > > >> > > > > persisted. We need to persist the
> > > > >> > > task-to-sspList
> > > > >> > > > > > > mapping
> > > > >> > > > > > > > in
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > > > coordinator stream so that the job
> > can
> > > > >> derive
> > > > >> > > the
> > > > >> > > > > > > original
> > > > >> > > > > > > > > > > number
> > > > >> > > > > > > > > > > >> of
> > > > >> > > > > > > > > > > >> > > > > partitions of each input stream
> > > > regardless
> > > > >> of
> > > > >> > > how
> > > > >> > > > > many
> > > > >> > > > > > > > times
> > > > >> > > > > > > > > > the
> > > > >> > > > > > > > > > > >> > > > partition
> > > > >> > > > > > > > > > > >> > > > > has expanded. Does this make sense?
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > I am not sure how this is related
> to
> > > the
> > > > >> > > locality
> > > > >> > > > > > > though.
> > > > >> > > > > > > > > Can
> > > > >> > > > > > > > > > > you
> > > > >> > > > > > > > > > > >> > > clarify
> > > > >> > > > > > > > > > > >> > > > > your question if I haven't answered
> > > your
> > > > >> > > question?
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > > Thanks!
> > > > >> > > > > > > > > > > >> > > > > Dong
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> Cheers!
> > > > >> > > > > > > > > > > >> > > > >> Navina
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM,
> > Dong
> > > > Lin
> > > > >> <
> > > > >> > > > > > > > > > lindon...@gmail.com
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >> > > wrote:
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >> > Hi all,
> > > > >> > > > > > > > > > > >> > > > >> >
> > > > >> > > > > > > > > > > >> > > > >> > We created SEP-5: Enable
> partition
> > > > >> > expansion
> > > > >> > > of
> > > > >> > > > > > input
> > > > >> > > > > > > > > > > streams.
> > > > >> > > > > > > > > > > >> > > Please
> > > > >> > > > > > > > > > > >> > > > >> find
> > > > >> > > > > > > > > > > >> > > > >> > the SEP wiki in the link
> > > > >> > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/
> > > > >> > > > > > > confluence/display/SAMZA/SEP-
> > > > >> > > > > > > > > > > >> > > > >> > 5%3A+Enable+partition+
> > > > >> > > > expansion+of+input+streams
> > > > >> > > > > > > > > > > >> > > > >> > .
> > > > >> > > > > > > > > > > >> > > > >> >
> > > > >> > > > > > > > > > > >> > > > >> > You feedback is appreciated!
> > > > >> > > > > > > > > > > >> > > > >> >
> > > > >> > > > > > > > > > > >> > > > >> > Thanks,
> > > > >> > > > > > > > > > > >> > > > >> > Dong
> > > > >> > > > > > > > > > > >> > > > >> >
> > > > >> > > > > > > > > > > >> > > > >>
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > > >
> > > > >> > > > > > > > > > > >> > > >
> > > > >> > > > > > > > > > > >> > >
> > > > >> > > > > > > > > > > >> >
> > > > >> > > > > > > > > > > >>
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > > >
> > > > >> > > > > > > > > > >
> > > > >> > > > > > > > > >
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> We are hiring in Streams Infra (Kafka/Samza/Datastream) !!
> > > > >>
> > > > >
> > > > >
> > > >
> > ...
> >
> > [Message clipped]
>

Reply via email to