Hey Stanislav,

Thanks for the thorough look at the KIP! :)

> Let me first explicitly confirm my understanding of the configs and the
> algorithm:
> * reassignment.parallel.replica.count - the maximum total number of
> replicas that we can move at once, *per partition*
> * reassignment.parallel.partition.count - the maximum number of partitions
> we can move at once
> * reassignment.parallel.leader.movements - the maximum number of leader
> movements we can have at once

Yes.

> As far as I currently understand it, your proposed algorithm will
naturally
> prioritize leader movement first. e.g if
> reassignment.parallel.replica.count=1 and
>
reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> we will always move one, the first possible, replica in the replica set
> (which will be the leader if part of the excess replica set (ER)).
> Am I correct in saying that?

Yes.

> 1. Does it make sense to add `max` somewhere in the configs' names?

If you imply that it's not always an exact number (because the last batch
could be smaller) than I think it's a good idea. I don't mind this naming
or having max in it either.

> 2. How does this KIP play along with KIP-455's notion of multiple
> rebalances - do the configs apply on a
> single AlterPartitionAssignmentsRequest or are they global?

I think it is intuitive if there are on a global level and the user would
have the option to set KIP-435's settings to Int.MAX and thus eliminate any
batching and submit their own batches through
AlterPartitionAssignmentsRequest. If we applied them on every batch then we
couldn't really guarantee any limits as the user would be able to get
around it with submitting lots of reassignments. By default though we set
the batching limits to Int.MAX so effectively they're disabled.
Also if I understand correctly, AlterPartitionAssignmentsRequest would be a
partition level batching, isn't it? So if you submit 10 partitions at once
then they'll all be started by the controller immediately as per my
understanding.

> 3. Unless I've missed it, the algorithm does not take into account
> `reassignment.parallel.leader.movements`

Yea the reassignment step calculation doesn't contain that because it
describes only one partition's reassignment step calculation. We simply
fill our reassignment batch with as many leader movements as we can and
then fill the rest with reassignments which don't require leader movement
if we have space. I'll create a pseudo code block on the KIP for this.

> 4. The KIP says that the order of the input has some control over how the
> batches are created - i.e it's deterministic. What would the batches of
the
> following reassignment look like:
> reassignment.parallel.replica.count=1
> reassignment.parallel.partition.count=MAX_INT
> reassignment.parallel.leader.movements=1
> partitionA - (0,1,2) -> (3, 4, 5)
> partitionB - (0,1,2) -> (3,4,5)
> partitionC - (0,1,2) -> (3, 4, 5)
> From my understanding, we would start with A(0->3), B(1->4) and C(1->4).
Is
> that correct? Would the second step then continue with B(0->3)?
>
> If the configurations are global, I can imagine we will have a bit more
> trouble in preserving the expected ordering, especially across controller
> failovers -- but I'll avoid speculating until you confirm the scope of the
> config.

I've raised the ordering problem on the discussion of KIP-455 in a bit
different form and as I remember the verdict there was that we shouldn't
expose ordering as an API. It might not be easy as you say and there might
be much better strategies to follow (like disk or network utilization
goals). Therefore I'll remove this section from the KIP. (Otherwise yes, I
would have thought of the same behavior what you described.)
Since #3 also was about the pseudo code, it would be something like this:
Config:
reassignment.parallel.replica.count=R
reassignment.parallel.partition.count=P
reassignment.parallel.leader.movements=L
Code:
val batchSize = max(L,P)
// split the individual partition reassignments whether they involve leader
movement or not
val partitionMovements =
calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment)
// fill the batch with as much leader movements as possible and take the
rest from other reassignments
val currentBatch = if (partitionMovements.leaderMovements.size < batchSize)
  partitionMovements.leaderMovements ++
partitionsToReassign.otherPartitionMovements.take(batchSize -
partitionMovements.leaderMovements.size)
else
 partitionMovements.leaderMovements.take(batchSize)
executeReassignmentOnBatch(currentBatch)

> 5. Regarding the new behavior of electing the new preferred leader in the
> "first step" of the reassignment - does this obey the
> `auto.leader.rebalance.enable` config?
> If not, I have concerns regarding how backwards compatible this might be -
> e.g imagine a user does a huge reassignment (as they have always done) and
> suddenly a huge leader shift happens, whereas the user expected to
manually
> shift preferred leaders at a slower rate via
> the kafka-preferred-replica-election.sh tool.

I'm not sure I get this scenario. So are you saying that after they
submitted a reassignment they also submit a preferred leader change?
In my mind what I would do is:
i) make auto.leader.rebalance.enable to obey the leader movement limit as
this way it will be easier to calculate the reassignment batches.
ii) the user could submit preferred leader election but it's basically a
form of reassignment so it would fall under the batching criterias. If the
batch they submit is smaller than the internal, then it'd be executed
immediately but otherwise it would be split into more batches. It might be
a different behavior as it may not be executed it in one batch but I think
it isn't a problem because we'll default to Int.MAX with the batches and
otherwise because since it's a form of reassignment I think it makes sense
to apply the same logic.

> 6. What is the expected behavior if we dynamically change one of the
> configs to a lower value while a reassignment is happening. Would we
cancel
> some of the currently reassigned partitions or would we account for the
new
> values on the next reassignment? I assume the latter but it's good to be
> explicit

Yep, it would be the latter. I'll make this explicit in the KIP too.

About the nits:
Noted, will update the KIP to reflect your suggestions :)

Viktor

On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <stanis...@confluent.io>
wrote:
>
> Hey there Viktor,
>
> Thanks for working on this KIP! I agree with the notion that reliability,
> stability and predictability of a reassignment should be a core feature of
> Kafka.
>
> Let me first explicitly confirm my understanding of the configs and the
> algorithm:
> * reassignment.parallel.replica.count - the maximum total number of
> replicas that we can move at once, *per partition*
> * reassignment.parallel.partition.count - the maximum number of partitions
> we can move at once
> * reassignment.parallel.leader.movements - the maximum number of leader
> movements we can have at once
>
> As far as I currently understand it, your proposed algorithm will
naturally
> prioritize leader movement first. e.g if
> reassignment.parallel.replica.count=1 and
>
reassignment.parallel.partition.count==reassignment.parallel.leader.movements,
> we will always move one, the first possible, replica in the replica set
> (which will be the leader if part of the excess replica set (ER)).
> Am I correct in saying that?
>
> Regarding the KIP, I've got a couple of comments/questions::
>
> 1. Does it make sense to add `max` somewhere in the configs' names?
>
> 2. How does this KIP play along with KIP-455's notion of multiple
> rebalances - do the configs apply on a
> single AlterPartitionAssignmentsRequest or are they global?
>
> 3. Unless I've missed it, the algorithm does not take into account
> `reassignment.parallel.leader.movements`
>
> 4. The KIP says that the order of the input has some control over how the
> batches are created - i.e it's deterministic. What would the batches of
the
> following reassignment look like:
> reassignment.parallel.replica.count=1
> reassignment.parallel.partition.count=MAX_INT
> reassignment.parallel.leader.movements=1
> partitionA - (0,1,2) -> (3, 4, 5)
> partitionB - (0,1,2) -> (3,4,5)
> partitionC - (0,1,2) -> (3, 4, 5)
> From my understanding, we would start with A(0->3), B(1->4) and C(1->4).
Is
> that correct? Would the second step then continue with B(0->3)?
>
> If the configurations are global, I can imagine we will have a bit more
> trouble in preserving the expected ordering, especially across controller
> failovers -- but I'll avoid speculating until you confirm the scope of the
> config.
>
> 5. Regarding the new behavior of electing the new preferred leader in the
> "first step" of the reassignment - does this obey the
> `auto.leader.rebalance.enable` config?
> If not, I have concerns regarding how backwards compatible this might be -
> e.g imagine a user does a huge reassignment (as they have always done) and
> suddenly a huge leader shift happens, whereas the user expected to
manually
> shift preferred leaders at a slower rate via
> the kafka-preferred-replica-election.sh tool.
>
> 6. What is the expected behavior if we dynamically change one of the
> configs to a lower value while a reassignment is happening. Would we
cancel
> some of the currently reassigned partitions or would we account for the
new
> values on the next reassignment? I assume the latter but it's good to be
> explicit
>
> As some small nits:
> - could we have a section in the KIP where we explicitly define what each
> config does? This can be inferred from the KIP as is but requires careful
> reading, whereas some developers might want to skim through the change to
> get a quick sense. It also improves readability but that's my personal
> opinion.
> - could you better clarify how a reassignment step is different from the
> currently existing algorithm? maybe laying both algorithms out in the KIP
> would be most clear
> - the names for the OngoingPartitionReassignment and
> CurrentPartitionReassignment fields in the
> ListPartitionReassignmentsResponse are a bit confusing to me.
> Unfortunately, I don't have a better suggestion, but maybe somebody else
in
> the community has.
>
> Thanks,
> Stanislav
>
> On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass <
viktorsomo...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I've renamed my KIP as its name was a bit confusing so we'll continue
it in
> > this thread.
> > The previous thread for record:
> >
> >
https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E
> >
> > A short summary of the KIP:
> > In case of a vast partition reassignment (thousands of partitions at
once)
> > Kafka can collapse under the increased replication traffic. This KIP
will
> > mitigate it by introducing internal batching done by the controller.
> > Besides putting a bandwidth limit on the replication it is useful to
batch
> > partition movements as fewer number of partitions will use the available
> > bandwidth for reassignment and they finish faster.
> > The main control handles are:
> > - the number of parallel leader movements,
> > - the number of parallel partition movements
> > - and the number of parallel replica movements.
> >
> > Thank you for the feedback and ideas so far in the previous thread and
I'm
> > happy to receive more.
> >
> > Regards,
> > Viktor
> >
>
>
> --
> Best,
> Stanislav

Reply via email to