Hi Stan,

I meant the following (maybe rare) scenario - we have replicas [1, 2, 3] on
a lot of partitions and the user runs a massive rebalance to change them
all to [3, 2, 1]. In the old behavior, I think that this would not do
anything but simply change the replica set in ZK.
Then, the user could run kafka-preferred-replica-election.sh on a given set
of partitions to make sure the new leader 3 gets elected.

I thought the old algorithm would elect 3 as the leader in this case right
away at the end but I have to double check this. In any case I think it
would make sense in the new algorithm if we elected the new preferred
leader right away, regardless of the new leader is chosen from the existing
replicas or not. If the whole reassignment is in fact just changing the
replica order then either way it is a simple (trivial) operation and doing
it batched wouldn't slow it down much as there is no data movement
involved. If the reassignment is mixed, meaning it contains reordering and
real movement as well then in fact it would help to even out the load
faster as data movements can get long. For instance in case of a
reassignment batch of two partitions concurrently: P1: (1,2,3) -> (3,2,1)
and P2: (4,5,6) -> (7,8,9) the P2 reassignment would elect a new leader but
P1 wouldn't and it wouldn't help the goal of normalizing traffic on broker
1 that much.
Again, I'll have to check how the current algorithm works and if it has any
unknown drawbacks to implement what I sketched up above.

As for generic preferred leader election when called from the admin api or
the auto leader balance feature I think you're right that we should leave
it as it is. It doesn't involve any data movement so it's fairly fast and
it normalizes the cluster state quickly.

Viktor

On Tue, Jul 9, 2019 at 9:04 PM Stanislav Kozlovski <stanis...@confluent.io>
wrote:

> Hey Viktor,
>
>  I think it is intuitive if there are on a global level...If we applied
> > them on every batch then we
> > couldn't really guarantee any limits as the user would be able to get
> > around it with submitting lots of reassignments.
>
>
> Agreed. Could we mention this explicitly in the KIP?
>
> Also if I understand correctly, AlterPartitionAssignmentsRequest would be a
> > partition level batching, isn't it? So if you submit 10 partitions at
> once
> > then they'll all be started by the controller immediately as per my
> > understanding.
>
>
> Yep, absolutely
>
> I've raised the ordering problem on the discussion of KIP-455 in a bit
> > different form and as I remember the verdict there was that we shouldn't
> > expose ordering as an API. It might not be easy as you say and there
> might
> > be much better strategies to follow (like disk or network utilization
> > goals). Therefore I'll remove this section from the KIP.
>
>
> Sounds good to me.
>
> I'm not sure I get this scenario. So are you saying that after they
> > submitted a reassignment they also submit a preferred leader change?
> > In my mind what I would do is:
> > i) make auto.leader.rebalance.enable to obey the leader movement limit as
> > this way it will be easier to calculate the reassignment batches.
> >
>
> Sorry, this is my fault -- I should have been more clear.
> First, I didn't think through this well enough at the time, I don't think.
> If we have replicas=[1, 2, 3] and we reassign them to [4, 5, 6], it is
> obvious that a leader shift will happen. Your KIP proposes we shift
> replicas 1 and 4 first.
>
> I meant the following (maybe rare) scenario - we have replicas [1, 2, 3] on
> a lot of partitions and the user runs a massive rebalance to change them
> all to [3, 2, 1]. In the old behavior, I think that this would not do
> anything but simply change the replica set in ZK.
> Then, the user could run kafka-preferred-replica-election.sh on a given set
> of partitions to make sure the new leader 3 gets elected.
>
> ii) the user could submit preferred leader election but it's basically a
> > form of reassignment so it would fall under the batching criterias. If
> the
> > batch they submit is smaller than the internal, then it'd be executed
> > immediately but otherwise it would be split into more batches. It might
> be
> > a different behavior as it may not be executed it in one batch but I
> think
> > it isn't a problem because we'll default to Int.MAX with the batches and
> > otherwise because since it's a form of reassignment I think it makes
> sense
> > to apply the same logic.
>
>
> The AdminAPI for that is "electPreferredLeaders(Collection<TopicPartition>
> partitions)" and the old zNode is "{"partitions": [{"topic": "a",
> "partition": 0}]}" so it is a bit less explicit than our other reassignment
> API, but the functionality is the same.
> You're 100% right that it is a form of reassignment, but I hadn't thought
> of it like that and I some other people wouldn't have either.
> If I understand correctly, you're suggesting that we count the
> "reassignment.parallel.leader.movements" config against such preferred
> elections. I think that makes sense. If we are to do that we should
> explicitly mention that this KIP touches the preferred election logic as
> well. Would that config also bound the number of leader movements the auto
> rebalance inside the broker would do as well?
>
> Then again, the "reassignment.parallel.leader.movements" config has a bit
> of a different meaning when used in the context of a real reassignment
> (where data moves from the leader to N more replicas) versus in the
> preferred election switch (where all we need is two lightweight
> LeaderAndIsr requests), so I am not entirely convinced we may want to use
> the same config for that.
> It might be easier to limit the scope of this KIP and not place a bound on
> preferred leader election.
>
> Thanks,
> Stanislav
>
>
> On Mon, Jul 8, 2019 at 4:28 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> wrote:
>
> > Hey Stanislav,
> >
> > Thanks for the thorough look at the KIP! :)
> >
> > > Let me first explicitly confirm my understanding of the configs and the
> > > algorithm:
> > > * reassignment.parallel.replica.count - the maximum total number of
> > > replicas that we can move at once, *per partition*
> > > * reassignment.parallel.partition.count - the maximum number of
> > partitions
> > > we can move at once
> > > * reassignment.parallel.leader.movements - the maximum number of leader
> > > movements we can have at once
> >
> > Yes.
> >
> > > As far as I currently understand it, your proposed algorithm will
> > naturally
> > > prioritize leader movement first. e.g if
> > > reassignment.parallel.replica.count=1 and
> > >
> >
> >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > we will always move one, the first possible, replica in the replica set
> > > (which will be the leader if part of the excess replica set (ER)).
> > > Am I correct in saying that?
> >
> > Yes.
> >
> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> >
> > If you imply that it's not always an exact number (because the last batch
> > could be smaller) than I think it's a good idea. I don't mind this naming
> > or having max in it either.
> >
> > > 2. How does this KIP play along with KIP-455's notion of multiple
> > > rebalances - do the configs apply on a
> > > single AlterPartitionAssignmentsRequest or are they global?
> >
> > I think it is intuitive if there are on a global level and the user would
> > have the option to set KIP-435's settings to Int.MAX and thus eliminate
> any
> > batching and submit their own batches through
> > AlterPartitionAssignmentsRequest. If we applied them on every batch then
> we
> > couldn't really guarantee any limits as the user would be able to get
> > around it with submitting lots of reassignments. By default though we set
> > the batching limits to Int.MAX so effectively they're disabled.
> > Also if I understand correctly, AlterPartitionAssignmentsRequest would
> be a
> > partition level batching, isn't it? So if you submit 10 partitions at
> once
> > then they'll all be started by the controller immediately as per my
> > understanding.
> >
> > > 3. Unless I've missed it, the algorithm does not take into account
> > > `reassignment.parallel.leader.movements`
> >
> > Yea the reassignment step calculation doesn't contain that because it
> > describes only one partition's reassignment step calculation. We simply
> > fill our reassignment batch with as many leader movements as we can and
> > then fill the rest with reassignments which don't require leader movement
> > if we have space. I'll create a pseudo code block on the KIP for this.
> >
> > > 4. The KIP says that the order of the input has some control over how
> the
> > > batches are created - i.e it's deterministic. What would the batches of
> > the
> > > following reassignment look like:
> > > reassignment.parallel.replica.count=1
> > > reassignment.parallel.partition.count=MAX_INT
> > > reassignment.parallel.leader.movements=1
> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > partitionB - (0,1,2) -> (3,4,5)
> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > From my understanding, we would start with A(0->3), B(1->4) and
> C(1->4).
> > Is
> > > that correct? Would the second step then continue with B(0->3)?
> > >
> > > If the configurations are global, I can imagine we will have a bit more
> > > trouble in preserving the expected ordering, especially across
> controller
> > > failovers -- but I'll avoid speculating until you confirm the scope of
> > the
> > > config.
> >
> > I've raised the ordering problem on the discussion of KIP-455 in a bit
> > different form and as I remember the verdict there was that we shouldn't
> > expose ordering as an API. It might not be easy as you say and there
> might
> > be much better strategies to follow (like disk or network utilization
> > goals). Therefore I'll remove this section from the KIP. (Otherwise yes,
> I
> > would have thought of the same behavior what you described.)
> > Since #3 also was about the pseudo code, it would be something like this:
> > Config:
> > reassignment.parallel.replica.count=R
> > reassignment.parallel.partition.count=P
> > reassignment.parallel.leader.movements=L
> > Code:
> > val batchSize = max(L,P)
> > // split the individual partition reassignments whether they involve
> leader
> > movement or not
> > val partitionMovements =
> >
> >
> calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
> > // fill the batch with as much leader movements as possible and take the
> > rest from other reassignments
> > val currentBatch = if (partitionMovements.leaderMovements.size <
> batchSize)
> >   partitionMovements.leaderMovements ++
> > partitionsToReassign.otherPartitionMovements.take(batchSize -
> > partitionMovements.leaderMovements.size)
> > else
> >  partitionMovements.leaderMovements.take(batchSize)
> > executeReassignmentOnBatch(currentBatch)
> >
> > > 5. Regarding the new behavior of electing the new preferred leader in
> the
> > > "first step" of the reassignment - does this obey the
> > > `auto.leader.rebalance.enable` config?
> > > If not, I have concerns regarding how backwards compatible this might
> be
> > -
> > > e.g imagine a user does a huge reassignment (as they have always done)
> > and
> > > suddenly a huge leader shift happens, whereas the user expected to
> > manually
> > > shift preferred leaders at a slower rate via
> > > the kafka-preferred-replica-election.sh tool.
> >
> > I'm not sure I get this scenario. So are you saying that after they
> > submitted a reassignment they also submit a preferred leader change?
> > In my mind what I would do is:
> > i) make auto.leader.rebalance.enable to obey the leader movement limit as
> > this way it will be easier to calculate the reassignment batches.
> > ii) the user could submit preferred leader election but it's basically a
> > form of reassignment so it would fall under the batching criterias. If
> the
> > batch they submit is smaller than the internal, then it'd be executed
> > immediately but otherwise it would be split into more batches. It might
> be
> > a different behavior as it may not be executed it in one batch but I
> think
> > it isn't a problem because we'll default to Int.MAX with the batches and
> > otherwise because since it's a form of reassignment I think it makes
> sense
> > to apply the same logic.
> >
> > > 6. What is the expected behavior if we dynamically change one of the
> > > configs to a lower value while a reassignment is happening. Would we
> > cancel
> > > some of the currently reassigned partitions or would we account for the
> > new
> > > values on the next reassignment? I assume the latter but it's good to
> be
> > > explicit
> >
> > Yep, it would be the latter. I'll make this explicit in the KIP too.
> >
> > About the nits:
> > Noted, will update the KIP to reflect your suggestions :)
> >
> > Viktor
> >
> > On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <
> > stanis...@confluent.io>
> > wrote:
> > >
> > > Hey there Viktor,
> > >
> > > Thanks for working on this KIP! I agree with the notion that
> reliability,
> > > stability and predictability of a reassignment should be a core feature
> > of
> > > Kafka.
> > >
> > > Let me first explicitly confirm my understanding of the configs and the
> > > algorithm:
> > > * reassignment.parallel.replica.count - the maximum total number of
> > > replicas that we can move at once, *per partition*
> > > * reassignment.parallel.partition.count - the maximum number of
> > partitions
> > > we can move at once
> > > * reassignment.parallel.leader.movements - the maximum number of leader
> > > movements we can have at once
> > >
> > > As far as I currently understand it, your proposed algorithm will
> > naturally
> > > prioritize leader movement first. e.g if
> > > reassignment.parallel.replica.count=1 and
> > >
> >
> >
> reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> > > we will always move one, the first possible, replica in the replica set
> > > (which will be the leader if part of the excess replica set (ER)).
> > > Am I correct in saying that?
> > >
> > > Regarding the KIP, I've got a couple of comments/questions::
> > >
> > > 1. Does it make sense to add `max` somewhere in the configs' names?
> > >
> > > 2. How does this KIP play along with KIP-455's notion of multiple
> > > rebalances - do the configs apply on a
> > > single AlterPartitionAssignmentsRequest or are they global?
> > >
> > > 3. Unless I've missed it, the algorithm does not take into account
> > > `reassignment.parallel.leader.movements`
> > >
> > > 4. The KIP says that the order of the input has some control over how
> the
> > > batches are created - i.e it's deterministic. What would the batches of
> > the
> > > following reassignment look like:
> > > reassignment.parallel.replica.count=1
> > > reassignment.parallel.partition.count=MAX_INT
> > > reassignment.parallel.leader.movements=1
> > > partitionA - (0,1,2) -> (3, 4, 5)
> > > partitionB - (0,1,2) -> (3,4,5)
> > > partitionC - (0,1,2) -> (3, 4, 5)
> > > From my understanding, we would start with A(0->3), B(1->4) and
> C(1->4).
> > Is
> > > that correct? Would the second step then continue with B(0->3)?
> > >
> > > If the configurations are global, I can imagine we will have a bit more
> > > trouble in preserving the expected ordering, especially across
> controller
> > > failovers -- but I'll avoid speculating until you confirm the scope of
> > the
> > > config.
> > >
> > > 5. Regarding the new behavior of electing the new preferred leader in
> the
> > > "first step" of the reassignment - does this obey the
> > > `auto.leader.rebalance.enable` config?
> > > If not, I have concerns regarding how backwards compatible this might
> be
> > -
> > > e.g imagine a user does a huge reassignment (as they have always done)
> > and
> > > suddenly a huge leader shift happens, whereas the user expected to
> > manually
> > > shift preferred leaders at a slower rate via
> > > the kafka-preferred-replica-election.sh tool.
> > >
> > > 6. What is the expected behavior if we dynamically change one of the
> > > configs to a lower value while a reassignment is happening. Would we
> > cancel
> > > some of the currently reassigned partitions or would we account for the
> > new
> > > values on the next reassignment? I assume the latter but it's good to
> be
> > > explicit
> > >
> > > As some small nits:
> > > - could we have a section in the KIP where we explicitly define what
> each
> > > config does? This can be inferred from the KIP as is but requires
> careful
> > > reading, whereas some developers might want to skim through the change
> to
> > > get a quick sense. It also improves readability but that's my personal
> > > opinion.
> > > - could you better clarify how a reassignment step is different from
> the
> > > currently existing algorithm? maybe laying both algorithms out in the
> KIP
> > > would be most clear
> > > - the names for the OngoingPartitionReassignment and
> > > CurrentPartitionReassignment fields in the
> > > ListPartitionReassignmentsResponse are a bit confusing to me.
> > > Unfortunately, I don't have a better suggestion, but maybe somebody
> else
> > in
> > > the community has.
> > >
> > > Thanks,
> > > Stanislav
> > >
> > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
> > viktorsomo...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I've renamed my KIP as its name was a bit confusing so we'll continue
> > it in
> > > > this thread.
> > > > The previous thread for record:
> > > >
> > > >
> >
> >
> https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> > > >
> > > > A short summary of the KIP:
> > > > In case of a vast partition reassignment (thousands of partitions at
> > once)
> > > > Kafka can collapse under the increased replication traffic. This KIP
> > will
> > > > mitigate it by introducing internal batching done by the controller.
> > > > Besides putting a bandwidth limit on the replication it is useful to
> > batch
> > > > partition movements as fewer number of partitions will use the
> > available
> > > > bandwidth for reassignment and they finish faster.
> > > > The main control handles are:
> > > > - the number of parallel leader movements,
> > > > - the number of parallel partition movements
> > > > - and the number of parallel replica movements.
> > > >
> > > > Thank you for the feedback and ideas so far in the previous thread
> and
> > I'm
> > > > happy to receive more.
> > > >
> > > > Regards,
> > > > Viktor
> > > >
> > >
> > >
> > > --
> > > Best,
> > > Stanislav
> >
>
>
> --
> Best,
> Stanislav
>

Reply via email to