Hey Stanislav, Thanks for the thorough look at the KIP! :)
> Let me first explicitly confirm my understanding of the configs and the > algorithm: > * reassignment.parallel.replica.count - the maximum total number of > replicas that we can move at once, *per partition* > * reassignment.parallel.partition.count - the maximum number of partitions > we can move at once > * reassignment.parallel.leader.movements - the maximum number of leader > movements we can have at once Yes. > As far as I currently understand it, your proposed algorithm will naturally > prioritize leader movement first. e.g if > reassignment.parallel.replica.count=1 and > reassignment.parallel.partition.count==reassignment.parallel.leader.movements, > we will always move one, the first possible, replica in the replica set > (which will be the leader if part of the excess replica set (ER)). > Am I correct in saying that? Yes. > 1. Does it make sense to add `max` somewhere in the configs' names? If you imply that it's not always an exact number (because the last batch could be smaller) than I think it's a good idea. I don't mind this naming or having max in it either. > 2. How does this KIP play along with KIP-455's notion of multiple > rebalances - do the configs apply on a > single AlterPartitionAssignmentsRequest or are they global? I think it is intuitive if there are on a global level and the user would have the option to set KIP-435's settings to Int.MAX and thus eliminate any batching and submit their own batches through AlterPartitionAssignmentsRequest. If we applied them on every batch then we couldn't really guarantee any limits as the user would be able to get around it with submitting lots of reassignments. By default though we set the batching limits to Int.MAX so effectively they're disabled. Also if I understand correctly, AlterPartitionAssignmentsRequest would be a partition level batching, isn't it? So if you submit 10 partitions at once then they'll all be started by the controller immediately as per my understanding. > 3. Unless I've missed it, the algorithm does not take into account > `reassignment.parallel.leader.movements` Yea the reassignment step calculation doesn't contain that because it describes only one partition's reassignment step calculation. We simply fill our reassignment batch with as many leader movements as we can and then fill the rest with reassignments which don't require leader movement if we have space. I'll create a pseudo code block on the KIP for this. > 4. The KIP says that the order of the input has some control over how the > batches are created - i.e it's deterministic. What would the batches of the > following reassignment look like: > reassignment.parallel.replica.count=1 > reassignment.parallel.partition.count=MAX_INT > reassignment.parallel.leader.movements=1 > partitionA - (0,1,2) -> (3, 4, 5) > partitionB - (0,1,2) -> (3,4,5) > partitionC - (0,1,2) -> (3, 4, 5) > From my understanding, we would start with A(0->3), B(1->4) and C(1->4). Is > that correct? Would the second step then continue with B(0->3)? > > If the configurations are global, I can imagine we will have a bit more > trouble in preserving the expected ordering, especially across controller > failovers -- but I'll avoid speculating until you confirm the scope of the > config. I've raised the ordering problem on the discussion of KIP-455 in a bit different form and as I remember the verdict there was that we shouldn't expose ordering as an API. It might not be easy as you say and there might be much better strategies to follow (like disk or network utilization goals). Therefore I'll remove this section from the KIP. (Otherwise yes, I would have thought of the same behavior what you described.) Since #3 also was about the pseudo code, it would be something like this: Config: reassignment.parallel.replica.count=R reassignment.parallel.partition.count=P reassignment.parallel.leader.movements=L Code: val batchSize = max(L,P) // split the individual partition reassignments whether they involve leader movement or not val partitionMovements = calculateReassignmentStepsFor(partitionsToReassign).partition(partitionReassignment.involvesLeaderReassignment) // fill the batch with as much leader movements as possible and take the rest from other reassignments val currentBatch = if (partitionMovements.leaderMovements.size < batchSize) partitionMovements.leaderMovements ++ partitionsToReassign.otherPartitionMovements.take(batchSize - partitionMovements.leaderMovements.size) else partitionMovements.leaderMovements.take(batchSize) executeReassignmentOnBatch(currentBatch) > 5. Regarding the new behavior of electing the new preferred leader in the > "first step" of the reassignment - does this obey the > `auto.leader.rebalance.enable` config? > If not, I have concerns regarding how backwards compatible this might be - > e.g imagine a user does a huge reassignment (as they have always done) and > suddenly a huge leader shift happens, whereas the user expected to manually > shift preferred leaders at a slower rate via > the kafka-preferred-replica-election.sh tool. I'm not sure I get this scenario. So are you saying that after they submitted a reassignment they also submit a preferred leader change? In my mind what I would do is: i) make auto.leader.rebalance.enable to obey the leader movement limit as this way it will be easier to calculate the reassignment batches. ii) the user could submit preferred leader election but it's basically a form of reassignment so it would fall under the batching criterias. If the batch they submit is smaller than the internal, then it'd be executed immediately but otherwise it would be split into more batches. It might be a different behavior as it may not be executed it in one batch but I think it isn't a problem because we'll default to Int.MAX with the batches and otherwise because since it's a form of reassignment I think it makes sense to apply the same logic. > 6. What is the expected behavior if we dynamically change one of the > configs to a lower value while a reassignment is happening. Would we cancel > some of the currently reassigned partitions or would we account for the new > values on the next reassignment? I assume the latter but it's good to be > explicit Yep, it would be the latter. I'll make this explicit in the KIP too. About the nits: Noted, will update the KIP to reflect your suggestions :) Viktor On Fri, Jun 28, 2019 at 5:59 PM Stanislav Kozlovski <stanis...@confluent.io> wrote: > > Hey there Viktor, > > Thanks for working on this KIP! I agree with the notion that reliability, > stability and predictability of a reassignment should be a core feature of > Kafka. > > Let me first explicitly confirm my understanding of the configs and the > algorithm: > * reassignment.parallel.replica.count - the maximum total number of > replicas that we can move at once, *per partition* > * reassignment.parallel.partition.count - the maximum number of partitions > we can move at once > * reassignment.parallel.leader.movements - the maximum number of leader > movements we can have at once > > As far as I currently understand it, your proposed algorithm will naturally > prioritize leader movement first. e.g if > reassignment.parallel.replica.count=1 and > reassignment.parallel.partition.count==reassignment.parallel.leader.movements, > we will always move one, the first possible, replica in the replica set > (which will be the leader if part of the excess replica set (ER)). > Am I correct in saying that? > > Regarding the KIP, I've got a couple of comments/questions:: > > 1. Does it make sense to add `max` somewhere in the configs' names? > > 2. How does this KIP play along with KIP-455's notion of multiple > rebalances - do the configs apply on a > single AlterPartitionAssignmentsRequest or are they global? > > 3. Unless I've missed it, the algorithm does not take into account > `reassignment.parallel.leader.movements` > > 4. The KIP says that the order of the input has some control over how the > batches are created - i.e it's deterministic. What would the batches of the > following reassignment look like: > reassignment.parallel.replica.count=1 > reassignment.parallel.partition.count=MAX_INT > reassignment.parallel.leader.movements=1 > partitionA - (0,1,2) -> (3, 4, 5) > partitionB - (0,1,2) -> (3,4,5) > partitionC - (0,1,2) -> (3, 4, 5) > From my understanding, we would start with A(0->3), B(1->4) and C(1->4). Is > that correct? Would the second step then continue with B(0->3)? > > If the configurations are global, I can imagine we will have a bit more > trouble in preserving the expected ordering, especially across controller > failovers -- but I'll avoid speculating until you confirm the scope of the > config. > > 5. Regarding the new behavior of electing the new preferred leader in the > "first step" of the reassignment - does this obey the > `auto.leader.rebalance.enable` config? > If not, I have concerns regarding how backwards compatible this might be - > e.g imagine a user does a huge reassignment (as they have always done) and > suddenly a huge leader shift happens, whereas the user expected to manually > shift preferred leaders at a slower rate via > the kafka-preferred-replica-election.sh tool. > > 6. What is the expected behavior if we dynamically change one of the > configs to a lower value while a reassignment is happening. Would we cancel > some of the currently reassigned partitions or would we account for the new > values on the next reassignment? I assume the latter but it's good to be > explicit > > As some small nits: > - could we have a section in the KIP where we explicitly define what each > config does? This can be inferred from the KIP as is but requires careful > reading, whereas some developers might want to skim through the change to > get a quick sense. It also improves readability but that's my personal > opinion. > - could you better clarify how a reassignment step is different from the > currently existing algorithm? maybe laying both algorithms out in the KIP > would be most clear > - the names for the OngoingPartitionReassignment and > CurrentPartitionReassignment fields in the > ListPartitionReassignmentsResponse are a bit confusing to me. > Unfortunately, I don't have a better suggestion, but maybe somebody else in > the community has. > > Thanks, > Stanislav > > On Thu, Jun 27, 2019 at 3:24 PM Viktor Somogyi-Vass < viktorsomo...@gmail.com> > wrote: > > > Hi All, > > > > I've renamed my KIP as its name was a bit confusing so we'll continue it in > > this thread. > > The previous thread for record: > > > > https://lists.apache.org/thread.html/0e97e30271f80540d4da1947bba94832639767e511a87bb2ba530fe7@%3Cdev.kafka.apache.org%3E > > > > A short summary of the KIP: > > In case of a vast partition reassignment (thousands of partitions at once) > > Kafka can collapse under the increased replication traffic. This KIP will > > mitigate it by introducing internal batching done by the controller. > > Besides putting a bandwidth limit on the replication it is useful to batch > > partition movements as fewer number of partitions will use the available > > bandwidth for reassignment and they finish faster. > > The main control handles are: > > - the number of parallel leader movements, > > - the number of parallel partition movements > > - and the number of parallel replica movements. > > > > Thank you for the feedback and ideas so far in the previous thread and I'm > > happy to receive more. > > > > Regards, > > Viktor > > > > > -- > Best, > Stanislav