On Mon, Jul 15, 2019, at 14:31, Stanislav Kozlovski wrote: > Hey George, > > > Different replica threads for throttling > The reason we can't support throttling for reassigning partitions right now > is because we have no good way of telling whether a replica is part of an > ongoing reassignment or not. > KIP-455 now gives us this knowledge with the `addingReplicas` and > `removingReplicas` fields. I believe reassignment quotas can be implemented > cleanly once we have that information in the brokers, regardless of whether > they are in separate threads or not. > > In regards to the reassignment quota affecting non-reassignment traffic due > to sharing the replica fetcher threads, that's a non-issue. Leader/follower > quotas do not work the same way as client quotas. Kafka currently behaves > in the following way: > - if a follower quota is set and violated, the follower will not include > the violating partition in the next FetchRequest > - if a leader quota is set and violated, the leader will simply return no > records for that partition > > So the only benefits I see, might the ability to apply separate configs for > the separate use cases - normal replication and reassignment replication. I > think that might have value. > > Regardless, I think this is best left for another KIP. I suggest we > continue the discussion in another thread or perhaps even offline, as to > not derail this one. > > > So where does the new reassigned "targetReplicas" is stored? When all > replicas in ISR, and reassignment completes, it needs to update the > partition assignment with "targetReplicas" which has the exact replica > ordering, e.g. Preferred Leader, 2nd preferred leader, ....., not just by > removing the "removingReplicas" from the existing partition assignments? > The exact replica ordering will be preserved, even if we don't store the > targetReplicas anywhere explicitly. Through appropriate ordering inside the > full replica set, we can ensure that we end up with > replicas==targetReplicas once we empty out removingReplicas from the full > replica set. > > > Also I still hope this KIP-455 can provide a feature/solution of clean > cancel/rollback as much as possible. > I agree with Colin that we need not focus too much on optimizing > cancellation. > > > Current implementation is for the user to submit the list of current > reassignments json with a "--verify" option. It will check whether the > current partition assignments matches the one of the "targetReplicas" and > remove the throttles at the Topic Level and Broker Level > (leader.replication.throttled.rate/follower.replication.throttled.rate). KIP > -455 will remove Broker Level throttle only when no more reassignments in > the cluster? race conditions with another newly submitted reassignment > with throttle? actually general handling of race conditions of concurrent > submissions of reassignments which was guarded against before by the > /admin/reassign_partitions znode. > > That's a fair question. I think we should keep the behavior here the > same in KIP-455, to avoid changing too many things at once. The quotas > really could use improvement, but one thing at a time. > To me it sounds like trying to preserve this old behavior may be harder > than changing it. What if we were to simply: > 1. Remove replica-level quotas if there are no on-going reassignments for > that replica > 2. Remove broker-level quotas if there are no on-going reassignments for > that broker > This sounds intuitive to me
Hmm. I'm not sure why the old behavior would be hard to preserve. Quotas are totally separate from reassignments at the moment, so only the reassignment tool needs to be changed. I agree that this could be improved, but it would be nice to take a more holistic look at what we want to do with it (limit the quota only to reassignment traffic, etc., and maybe even remove the broker quota entirely if we don't need it.) Let's punt for now. best, Colin > > --- > Hey Jason, > > > Was it intentional to let the leader be responsible for shrinking the > ISR after RR is dropped from the replica set? Today the controller is > responsible for shrinking ISR. > No, it was not intentional. Good catch. I had personally learned that the > controller shrinks the ISR in such reassignments just the other day but did > not realize I had it the other way around in the KIP. We will preserve the > old behavior > > > On Mon, Jul 15, 2019 at 7:51 PM Jason Gustafson <ja...@confluent.io> wrote: > > > Hi Colin, > > > > A few more questions below: > > > > 1. The KIP says that the --zookeeper option will be removed from > > kafka-reassign.sh. Do you mean that it will be deprecated and eventually > > removed? > > > > 2. The KIP mentions that AR is shrunk iteratively. Does this have a > > benefit? The downside is that we lose track of the initial assignment, > > which means cancellation leaves the user with some additional cleanup in > > order to restore replication factors. I think it would be nice to avoid > > this unless there is a substantial benefit. It seems simpler and more > > consistent with current behavior just to drop AR at once when all replicas > > have caught up to the ISR. Later, in KIP-435, we could change this so that > > we can drop equal numbers of replicas from AR and RR at the same time as > > the new replicas come into sync. In this way, cancellation always leaves > > the system with the same replication factor. > > > > 3. Was it intentional to let the leader be responsible for shrinking the > > ISR after RR is dropped from the replica set? Today the controller is > > responsible for shrinking ISR. In this way, we can be sure that the replica > > set is always consistent with ISR which I think is an invariant we need to > > preserve. > > > > Thanks, > > Jason > > > > > > > > > > > > On Sat, Jul 13, 2019 at 5:54 PM George Li <sql_consult...@yahoo.com > > .invalid> > > wrote: > > > > > Hi Stanislav, > > > > > > sorry for the late reply. comments below: > > > > > > > Thanks for the reminder. A lot of your suggestions are outlined in the > > > > "Future Work" section of KIP-455. The pointer towards different > > > > ReplicaFetcher thread pools is interesting -- do you think there's > > > > much value in that? My intuition is that having appropriate quotas for > > > > the reassignment traffic is the better way to separate the two, > > > > whereas a separate thread pool might provide less of a benefit. > > > > > > > > > I think separating the Reassignment replication from the normal follower > > > traffic of the ReplicaFetcher Threads will give us some benefits: > > > > > > 1. The throttling on Reassignment traffic will be much cleaner to the > > > Reassignment replication threads, Right now, it's complicated using the > > > topic config (+broker level) for throttle. e.g. > > > > > > > > > Topic:georgeli_test PartitionCount:8 ReplicationFactor:4 > > > > > Configs:leader.replication.throttled.replicas=0:1026,0:1028,0:1025,follower.replication.throttled.replicas=0:1027 > > > Topic: georgeli_test Partition: 0 Leader: 1026 Replicas: > > > 1027,1025,1028,1026 Isr: 1026,1028,1025 > > > > > > > > > When trying to do reassignment (1026,1028,1025) ==> (1027,1025,1028), it > > > puts the partition#, brokers in the Topic config, if 1027 caught up in > > ISR, > > > and reassignment completed, 1027 could still be throttled. unless we > > > remove the throttle. If it's dedicated Reassignment replication thread, > > > the completed reassignment replica will move back to the normal > > > ReplicaFetcher threads. Making managing throttle much easier. > > > > > > > > > 2. One of the major issues in reassignments is the performance impact on > > > "lossless" type of cluster/topics, the producer latency spikes > > > (min.insync.replicas>1) even with throttling, the major contribution of > > > latency spike from RemoteTimeMs (replication). In theory, since > > > Reassignment traffic is sharing the same ReplicaFetcher threads, for > > large > > > partition, it will affect the existing Leader => Follower replication. > > Also > > > after separating, the reassignment replication can have different > > settings > > > to increase network throughput, etc. because it's pulling large amount of > > > historical data? The normal replication follower traffic is doing less > > > incremental data. Two different type of use cases. > > > > > > > > > > With regards to keeping the original replicas info before > > > > reassignments are kicked off - this KIP proposes that we store the > > > > `targetReplicas` in a different collection and thus preserve the > > > > original replicas info until the reassignment is fully complete. It > > > > should allow you to implement rollback functionality. Please take a > > > > look at the KIP and confirm if that is the case. It would be good to > > > > synergize both KIPs. > > > > > > I think the "targetReplicas" in ZK node /topics/<topic> is fine. But the > > > code (makeFollower()?) to determine where ReplicaFetcher should pull > > needs > > > a change, right now, it's using the partition current assignment > > replicas. > > > Current implementation is first updating OAR + RAR. I just submit > > > https://issues.apache.org/jira/browse/KAFKA-8663 for a minor improvement > > > request. > > > > > > I just take a look at the updated KIP-455 again. I noticed > > > this "targetReplicas" is removed and instead, put "addingReplicas" & > > > "removingReplicas". So where does the new reassigned "targetReplicas" is > > > stored? When all replicas in ISR, and reassignment completes, it needs to > > > update the partition assignment with "targetReplicas" which has the exact > > > replica ordering, e.g. Preferred Leader, 2nd preferred leader, ....., not > > > just by removing the "removingReplicas" from the existing partition > > > assignments? > > > > > > Also I still hope this KIP-455 can provide a feature/solution of clean > > > cancel/rollback as much as possible. It would be easier for the user. > > e.g. > > > (1, 2, 3) => (4, 5, 6), If ISR goes to (1,2,3,4), and cancel, since 1, > > 2, > > > 3 all in ISR, why not cleanly rollback to (1,2,3), but leave it as > > > (1,2,3,4)? having 4 replicas there will increase storage of the cluster, > > > and replicationFactor discrepancies among partitions of the same topic > > > (e.g. future expand partition operation of this topic will fail). With > > > "originalReplicas" & "targetReplicas", I think we can derive > > > "addingReplicas" & "removingReplicas". > > > > > > > > > Can KIP-455 elaborate a bit more detail on how to remove the reassignment > > > throttle ? Current implementation is for the user to submit the list of > > > current reassignments json with a "--verify" option. It will check > > whether > > > the current partition assignments matches the one of the "targetReplicas" > > > and remove the throttles at the Topic Level and Broker Level > > > (leader.replication.throttled.rate/follower.replication.throttled.rate). > > > KIP-455 will remove Broker Level throttle only when no more reassignments > > > in the cluster? race conditions with another newly submitted > > reassignment > > > with throttle? actually general handling of race conditions of > > concurrent > > > submissions of reassignments which was guarded against before by the > > > /admin/reassign_partitions znode. > > > > > > Overall, I think KIP-455 is a great improvement and direction to go. > > > > > > Thanks, > > > George > > > > > > On Tuesday, July 9, 2019, 03:14:52 AM PDT, Stanislav Kozlovski < > > > stanis...@confluent.io> wrote: > > > > > > Hey there everybody, > > > > > > I've edited the KIP. Here is a diff of the changes - > > > > > > > > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=112820260&selectedPageVersions=13&selectedPageVersions=11 > > > Specifically, > > > - new AdminAPI for listing ongoing reassignments for _a given list of > > > topics_. > > > - made AlterPartitionReassignmentsRequest#Topics and > > > AlterPartitionReassignmentsRequest#Partitions nullable fields, in > > > order to make the API more flexible in large cancellations > > > - made ListPartitionReassignmentsRequest#PartitionIds nullable as > > > well, to support listing every reassigning partition for a given topic > > > without the need to specify each partition > > > - explicitly defined what a reassignment cancellation means > > > - laid out the algorithm for changing the state in ZK (this was > > > spelled out by Colin in the previous thread) > > > - mention expected behavior when reassigning during software > > > upgrade/downgrades > > > - mention the known edge case of using both APIs at once during a > > > controller failover > > > > > > I look forward to feedback. > > > > > > Hey George, > > > > Regardless of KIP-236 or KIP-455, I would like stress the importance > > of > > > keeping the original replicas info before reassignments are kicked off. > > > This original replicas info will allow us to distinguish what replicas > > are > > > currently being reassigned, so we can rollback to its original state. > > > > Also this will opens up possibility to separate the ReplicaFetcher > > > traffic of normal follower traffic from Reassignment traffic, also the > > > metrics reporting URP, MaxLag, TotalLag, etc. right now, Reassignment > > > traffic and normal follower traffic shared the same ReplicaFetcher > > threads > > > pool. > > > > > > Thanks for the reminder. A lot of your suggestions are outlined in the > > > "Future Work" section of KIP-455. The pointer towards different > > > ReplicaFetcher thread pools is interesting -- do you think there's > > > much value in that? My intuition is that having appropriate quotas for > > > the reassignment traffic is the better way to separate the two, > > > whereas a separate thread pool might provide less of a benefit. > > > > > > With regards to keeping the original replicas info before > > > reassignments are kicked off - this KIP proposes that we store the > > > `targetReplicas` in a different collection and thus preserve the > > > original replicas info until the reassignment is fully complete. It > > > should allow you to implement rollback functionality. Please take a > > > look at the KIP and confirm if that is the case. It would be good to > > > synergize both KIPs. > > > > > > > > > Thanks, > > > Stanislav > > > > > > > > > On Tue, Jul 9, 2019 at 12:43 AM George Li > > > <sql_consult...@yahoo.com.invalid> wrote: > > > > > > > > > Now that we support multiple reassignment requests, users may add > > > execute> them incrementally. Suppose something goes horribly wrong and > > they > > > want to> revert as quickly as possible - they would need to run the tool > > > with> multiple rollback JSONs. I think that it would be useful to have > > an > > > easy> way to stop all ongoing reassignments for emergency situations. > > > > > > > > KIP-236: Interruptible Partition Reassignment is exactly trying to > > > cancel the pending reassignments cleanly/safely in a timely fashion. > > It's > > > possible to cancel/rollback the reassignments not yet completed if the > > > original replicas before reassignment is saved somewhere. e.g. the > > > /admin/reassign_partitions znode, the Controller's ReassignmentContext > > > memory struct. > > > > > > > > I think a command line option like "kafka-reassign-partitions.sh > > > --cancel" would be easier for the user to cancel whatever pending > > > reassignments going on right now. no need to find the rollback json > > files > > > and re-submit them as reassignments. > > > > > > > > Regardless of KIP-236 or KIP-455, I would like stress the importance > > of > > > keeping the original replicas info before reassignments are kicked off. > > > This original replicas info will allow us to distinguish what replicas > > are > > > currently being reassigned, so we can rollback to its original state. > > Also > > > this will opens up possibility to separate the ReplicaFetcher traffic of > > > normal follower traffic from Reassignment traffic, also the metrics > > > reporting URP, MaxLag, TotalLag, etc. right now, Reassignment traffic and > > > normal follower traffic shared the same ReplicaFetcher threads pool. > > > > > > > > Thanks, > > > > George > > > > > > > > On Tuesday, July 2, 2019, 10:47:55 AM PDT, Stanislav Kozlovski < > > > stanis...@confluent.io> wrote: > > > > > > > > Hey there, I need to start a new thread on KIP-455. I think there > > might > > > be > > > > an issue with the mailing server. For some reason, my replies to the > > > > previous discussion thread could not be seen by others. After numerous > > > > attempts, Colin suggested I start a new thread. > > > > > > > > Original Discussion Thread: > > > > > > > > > https://sematext.com/opensee/m/Kafka/uyzND1Yl7Er128CQu1?subj=+DISCUSS+KIP+455+Create+an+Administrative+API+for+Replica+Reassignment > > > > KIP: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment > > > > Last Reply of Previous Thread: > > > > > > > > > http://mail-archives.apache.org/mod_mbox/kafka-dev/201906.mbox/%3C679a4c5b-3da6-4556-bb89-e680d8cbb705%40www.fastmail.com%3E > > > > > > > > The following is my reply: > > > > ---- > > > > Hi again, > > > > > > > > This has been a great discussion on a tricky KIP. I appreciate > > > everybody's > > > > involvement in improving this crucial API. > > > > That being said, I wanted to apologize for my first comment, it was a > > bit > > > > rushed and not thought out. > > > > > > > > I've got a few questions now that I dove into this better: > > > > > > > > 1. Does it make sense to have an easy way to cancel all ongoing > > > > reassignments? To cancel all ongoing reassignments, users had the crude > > > > option of deleting the znode, bouncing the controller and running the > > > > rollback JSON assignment that kafka-reassign-partitions.sh gave them > > > > (KAFKA-6304). > > > > Now that we support multiple reassignment requests, users may add > > execute > > > > them incrementally. Suppose something goes horribly wrong and they want > > > to > > > > revert as quickly as possible - they would need to run the tool with > > > > multiple rollback JSONs. I think that it would be useful to have an > > easy > > > > way to stop all ongoing reassignments for emergency situations. > > > > > > > > --------- > > > > > > > > 2. Our kafka-reassign-partitions.sh tool doesn't seem to currently let > > > you > > > > figure out the ongoing assignments - I guess we expect people to use > > > > kafka-topics.sh for that. I am not sure how well that would continue to > > > > work now that we update the replica set only after the new replica > > joins > > > > the ISR. > > > > Do you think it makes sense to add an option for listing the current > > > > reassignments to the reassign tool as part of this KIP? > > > > > > > > We might want to think whether we want to show the TargetReplicas > > > > information in the kafka-topics command for completeness as well. That > > > > might involve the need to update the DescribeTopicsResponse. > > Personally I > > > > can't see a downside but I haven't given it too much thought. I fully > > > agree > > > > that we don't want to add the target replicas to the full replica set > > and > > > > nothing useful comes out of telling users they have a replica that > > might > > > > not have copied a single byte. Yet, telling them that we have the > > > intention > > > > of copying bytes sounds useful so maybe having a separate column in > > > > kafka-topics.sh would provide better clarity? > > > > > > > > --------- > > > > > > > > 3. What happens if we do another reassignment to a partition while one > > is > > > > in progress? Do we overwrite the TargetReplicas? > > > > In the example sequence you gave: > > > > R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6] > > > > What would the behavior be if a new reassign request came with > > > > TargetReplicas of [7, 8, 9] for that partition? > > > > > > > > To avoid complexity and potential race conditions, would it make sense > > to > > > > reject a reassign request once one is in progress for the specific > > > > partition, essentially forcing the user to cancel it first? > > > > Forcing the user to cancel has the benefit of being explicit and > > guarding > > > > against human mistakes. The downside I can think of is that in some > > > > scenarios it might be inefficient, e.g > > > > R: [1, 2, 3, 4, 5, 6], I: [1, 2, 3, 4, 5, 6], T: [4, 5, 6] > > > > Cancel request sent out. Followed by a new reassign request with > > > > TargetReplicas of [5, 6, 7] (note that 5 and 6 already fully copied the > > > > partition). Becomes a bit of a race condition of whether we deleted the > > > > partitions in between requests or not - I assume in practice this won't > > > be > > > > an issue. I still feel like I prefer the explicit cancellation step > > > > > > > > --------- > > > > > > > > 4. My biggest concern - I want to better touch on the interaction > > between > > > > the new API and the current admin/reassign_partitions znode, the > > > > compatibility and our strategy there. > > > > The KIP says: > > > > > > > > > For compatibility purposes, we will continue to allow assignments to > > be > > > > > submitted through the /admin/reassign_partitions node. Just as with > > the > > > > > current code, this will only be possible if there are no current > > > > > assignments. In other words, the znode has two states: empty and > > > waiting > > > > > for a write, and non-empty because there are assignments in progress. > > > Once > > > > > the znode is non-empty, further writes to it will be ignored. > > > > > > > > Given the current proposal, I can think of 4 scenarios I want to get a > > > > better understanding of: > > > > > > > > *(i, ii, iii, iiii talk about the reassignment of the same one > > partition > > > > only - partitionA)* > > > > > > > > i. znode is empty, new reassignment triggered via API, znode is updated > > > > When the new reassignment is triggered via the API, do we create the > > > znode > > > > or do we allow a separate tool to trigger another reassignment through > > > it? > > > > > > > > ii. (assuming we allow creating the znode as with scenario "i"): znode > > is > > > > empty, new reassignment triggered via API, znode is updated, znode is > > > > DELETED > > > > My understand is that deleting the znode does not do anything until the > > > > Controller is bounced - is that correct? > > > > If so, this means that nothing will happen. If the Controller is > > bounced, > > > > the reassignment state will still be live in the [partitionId]/state > > > znode > > > > > > > > iii. znode is updated, new reassignment triggered via API > > > > We override the reassignment for partitionA. The reassign_partitions > > > znode > > > > is showing stale data, correct? > > > > > > > > iiii. znode is updated, new reassignment triggered via API, controller > > > > failover > > > > What does the controller believe - the [partitionId]/state znode or the > > > > /reassign_partitions ? I would assume the [partitionId]/state znode > > since > > > > in this case we want the reassignment API call to be the correct one. I > > > > think that opens up the possibility of missing a freshly-set > > > > /reassign_partitions though (e.g if it was empty and was set right > > during > > > > controller failover) > > > > > > > > iiiii. znode is updated to move partitionA, new reassignment triggered > > > via > > > > API for partitionB, partitionA move finishes > > > > At this point, do we delete the znode or do we wait until the > > partitionB > > > > move finishes as well? > > > > > > > > From the discussion here: > > > > > > > > > There's no guarantee that what is in the znode reflects the current > > > > > reassignments that are going on. The only thing you can know is that > > > if > > > > > the znode exists, there is at least one reassignment going on. > > > > > > > > This is changing the expected behavior of a tool that obeys Kafka's > > > current > > > > behavior though. It is true that updating the znode while a > > reassignment > > > is > > > > in progress has no effect but make ZK misleading but tools might have > > > grown > > > > to follow that rule and only update the znode once it is empty. I think > > > we > > > > might want to be more explicit when making such changes - I had seen > > > > discontentment in the community from the fact that we had changed the > > > znode > > > > updating behavior in a MINOR pull request. > > > > > > > > I feel it is complex to support both APIs and make sure we don't have > > > > unhandled edge cases. I liked Bob's suggestion on potentially allowing > > > only > > > > one via a feature flag: > > > > > > > > > Could we temporarily support > > > > > both, with a config enabling the new behavior to prevent users from > > > trying > > > > > to use both mechanisms (if the config is true, the old znode is > > > ignored; if > > > > > the config is false, the Admin Client API returns an error indicating > > > that > > > > > it is not enabled)? > > > > > > > > Perhaps it makes sense to discuss that possibility a bit more? > > > > > > > > --------- > > > > > > > > 5. ListPartitionReassignments filtering > > > > > > > > I guess the thought process here is that most reassignment tools want > > to > > > > > know about all the reassignments that are going on. If you don't > > know > > > all > > > > > the pending reassignments, then it's hard to say whether adding a new > > > one > > > > > is a good idea, or cancelling an existing one. So I guess I can't > > > think of > > > > > a case where a reassignment tool would want a partial set rather than > > > the > > > > > full one. > > > > > > > > > > > > I agree with Jason about the UIs having "drill into" options. I believe > > > we > > > > should support some sort of ongoing reassignment filtering at the topic > > > > level (that's the administrative concept people care about). > > > > An example of a tool that might leverage it is our own > > > > kafka-reassign-partitions.sh. You can ask that tool to generate a > > > > reassignment for you from a given list of topics. It currently uses > > > > `KafkaZkClient#getReplicaAssignmentForTopics()` to get the current > > > > assignment for the given topics. It would be better if it could use the > > > new > > > > ListPartitionsReassignments API to both figure out the current replica > > > > assignments and whether or not those topics are being reassigned (it > > > could > > > > log a warning that a reassignment is in progress for those topics). > > > > > > > > --------- > > > > > > > > and a small nit: We also need to update > > > > the ListPartitionReassignmentsResponse with the decided > > > > current/targetReplicas naming > > > > > > > > Thanks, > > > > Stanislav > > > > > > > > > > > > > > > > -- > > > Best, > > > Stanislav > > > > > -- > Best, > Stanislav >