Thanks for the KIP, making reassignment more flexible is definitely
welcome. As others have mentioned, I think we need to do it via the Kafka
protocol and not via ZK. The latter introduces an implicit API that other
tools will depend on causing migration challenges. This has already
happened with the existing ZK based interface and we should avoid
introducing more tech debt here.

Ismael

On Sat, Mar 23, 2019, 12:09 PM Colin McCabe <cmcc...@apache.org> wrote:

> On Thu, Mar 21, 2019, at 20:51, George Li wrote:
> >  Hi Colin,
> >
> > I agree with your proposal of having administrative APIs through RPC
> > instead of ZooKeeper. But seems like it will incur significant changes
> > to both submitting reassignments and this KIP's cancelling pending
> > reassignments.
> >
> > To make this KIP simple and moving along, I will be happy to do another
> > follow-up KIP to change all reassignment related operations via RP
>
> Thanks, George.  I think doing it as a two-step process is fine, but I
> suspect it would be much easier and quicker to do the RPC conversion first,
> and the interruptible part later.  The reason is because a lot of the
> things that people have brought up as concerns with this KIP are really
> issues with the API (how will people interact with ZK, how does access
> control work, what does the format look like in ZK) that will just go away
> once we have an RPC.
>
> > Just curious,  KIP-4 includes Topics/ACL related operations. In
> > addition to Reassignments,  any other operations should be done via
> > RPC?
>
> I think all of the administrative shell scripts have been converted except
> kafka-configs.sh.  I believe there is a KIP for that conversion.
> Reassigning partitions is probably the biggest KIP-4 gap we have right now.
>
> best,
> Colin
>
> >
> > Thanks,
> > George
> >
> >
> >     On Wednesday, March 20, 2019, 5:28:59 PM PDT, Colin McCabe
> > <co...@cmccabe.xyz> wrote:
> >
> >  Hi George,
> >
> > One big problem here is that administrative APIs should be done through
> > RPCs, not through ZooKeeper.  KIP-4 (Command line and centralized
> > administrative operations) describes the rationale for this.  We want
> > public and stable APIs that don't depend on the internal representation
> > of stuff in ZK, which will change over time.  Tools shouldn't have to
> > integrate with ZK or understand the internal data structures of Kafka
> > to make administrative changes.  ZK doesn't have a good security,
> > access control, or compatibility story.
> >
> > We should create an official reassignment RPC for Kafka.  This will
> > solve many of the problems discussed in this thread, I think.  For
> > example, with an RPC, users won't be able to make changes unless they
> > have ALTER on KafkaCluster.  That avoids the problem of random users
> > making changes without the administrator knowing.  Also, if multiple
> > users are making changes, there is no risk that they will overwrite
> > each other's changes, since they won't be modifying the internal ZK
> > structures directly.
> >
> > I think a good reassignment API would be something like:
> >
> > > ReassignPartitionsResults reassignPartitions(Map<TopicPartition,
> PartitionAssignment> reassignments);
> > >
> > > class PartitionAssignment {
> > >  List<Integer> nodes;
> > > }
> > >
> > > class ReassignPartitionsResults {
> > >  Map<TopicPartition, KafkaFuture<Void>> pending;
> > >  Map<TopicPartition, KafkaFuture<Void>> completed;
> > >  Map<TopicPartition, KafkaFuture<Void>> rejected;
> > > }
> > >
> > > PendingReassignmentResults pendingReassignments();
> > >
> > > class PendingReassignmentResults {
> > >  KafkaFuture<Map<TopicPartition, PartitionAssignment>> pending;
> > >  KafkaFuture<Map<TopicPartition, PartitionAssignment>> previous;
> > > }
> >
> > best,
> > Colin
> >
> >
> > On Tue, Mar 19, 2019, at 15:04, George Li wrote:
> > >  Hi Viktor,
> > >
> > > Thanks for the review.
> > >
> > > If there is reassignment in-progress while the cluster is upgraded
> with
> > > this KIP (upgrade the binary and then do a cluster rolling restart of
> > > the brokers), the reassignment JSON in Zookeeper
> > > /admin/reassign_partitions will only have  {topic, partition,
> > > replicas(new)} info when the batch of reassignment was kicked off
> > > before the upgrade,  not with the "original_replicas" info per
> > > topic/partition.  So when the user is trying to cancel/rollback the
> > > reassignments, it's going to fail and the cancellation will be skipped
> > > (The code in this KIP will check the if the "original_replicas" is in
> > > the /admin/reassign_partition).
> > >
> > > The user either has to wait till current reassignments to finish or
> > > does quite some manual work to cancel them (delete ZK node, bounce
> > > controller, re-submit reassignments with original replicas to
> rollback,
> > > if the original replicas are kept before the last batch of
> > > reassignments were submitted).
> > >
> > > I think this scenario of reassignments being kicked off by end-user,
> > > not by the team(s) that managed Kafka infrastructure might be rare
> > > (maybe in some very small companies?),  since only one batch of
> > > reassignments can be running at a given time in
> > > /admin/reassign_partitions.  The end-users need some co-ordination for
> > > submitting reassignments.
> > >
> > > Thanks,
> > > George
> > >
> > >
> > >    On Tuesday, March 19, 2019, 3:34:20 AM PDT, Viktor Somogyi-Vass
> > > <viktorsomo...@gmail.com> wrote:
> > >
> > >  Hey George,
> > > Thanks for the answers. I'll try to block out time this week to review
> > > your PR.
> > > I have one more point to clarify:I've seen some customers who are
> > > managing Kafka as an internal company-wide service and they may or may
> > > not know that how certain topics are used within the company. That
> > > might mean that some clients can start reassignment at random
> > > times. Let's suppose that such a reassignment starts just when the
> > > Kafka operation team starts upgrading the cluster that contains this
> > > KIP. The question is: do you think that we need handle upgrade
> > > scenarios where there is an in-progress reassignment?
> > > Thanks,
> > > Viktor
> > >
> > > On Tue, Mar 19, 2019 at 6:16 AM George Li <sql_consult...@yahoo.com>
> wrote:
> > >
> > >  Hi Viktor,
> > > FYI, I have added a new ducktape
> > > test:  tests/kafkatest/tests/core/reassign_cancel_test.py
> > > to https://github.com/apache/kafka/pull/6296
> > > After review, do you have any more questions?  Thanks
> > >
> > > Hi Jun,
> > > Could you help review this when you have time?  Thanks
> > >
> > > Can we start a vote on this KIP in one or two weeks?
> > >
> > > Thanks,George
> > >
> > >
> > >    On Tuesday, March 5, 2019, 10:58:45 PM PST, George Li
> > > <sql_consult...@yahoo.com> wrote:
> > >
> > >  Hi Viktor,
> > >
> > >
> > > >  2.: One follow-up question: if the reassignment cancellation gets
> interrupted and a failover happens after step #2 but before step #3, how
> will the new controller continue? At this stage Zookeeper would contain
> OAR + RAR, however the brokers will have the updated LeaderAndIsr about
> OAR, so they won't know about RAR. I would suppose the new controller would
> start from the beginning as it only knows what's in Zookeeper. Is that true?
> > >
> > > The OAR (Original Assigned Replicas) for the rollback is stored in the
> > > /admin/reassign_partitions for each topic/partition reassignments.
> > > During the controller failover,  the new controller will read
> > > /admin/reassign_partitions for both new_replicas AND original_replicas
> > > into controllerContext.partitionsBeingReassigned
> > >
> > > then perform pending reassignments cancellation/rollback.
> > > The originalReplicas  is added below:
> > > case class ReassignedPartitionsContext(var newReplicas: Seq[Int] =
> > > Seq.empty,
> > >                                        var originalReplicas: Seq[Int]=
> > > Seq.empty,
> > >                                        val reassignIsrChangeHandler:
> > > PartitionReassignmentIsrChangeHandler) {
> > >
> > >
> > > > 2.1: Another interesting question that are what are those replicas
> are doing which are online but not part of the leader and ISR? Are they
> still replicating? Are they safe to lie around for the time being?
> > >
> > > I think they are just dangling without being replicated.  Upon
> > > LeaderAndIsr request, the makeFollowers()
> > > will replicaFetcherManager.removeFetcherForPartitions() and it will
> not
> > > be added in ReplicaFetcher since they are not in the current AR
> > > (Assigned Replicas).   Step 4 (StopReplica) will delete them.
> > >
> > >
> > > I am adding a new ducktape test:
> > > tests/kafkatest/tests/core/reassign_cancel_test.py for cancelling
> > > pending reassignments.  but it's not easy to simulate controller
> > > failover during the cancellation since the cancellation/rollback is so
> > > fast to complete. See below.  I do have a unit/integration test to
> > > simulate controller
> > > failover:  shouldTriggerReassignmentOnControllerStartup()
> > >
> > >
> > > Thanks,George
> > >
> > >
> > >
> > > Warning: You must run Verify periodically, until the reassignment
> > > completes, to ensure the throttle is removed. You can also alter the
> > > throttle by rerunning the Execute command passing a new value.The
> > > inter-broker throttle limit was set to 1024 B/sSuccessfully started
> > > reassignment of partitions.
> > > [INFO  - 2019-03-05 04:20:48,321 - kafka - execute_reassign_cancel -
> > > lineno:514]: Executing cancel / rollback pending partition
> > > reassignment...[DEBUG - 2019-03-05 04:20:48,321 - kafka -
> > > execute_reassign_cancel - lineno:515]:
> > > /opt/kafka-dev/bin/kafka-reassign-partitions.sh --zookeeper
> > > worker1:2181 --cancel[DEBUG - 2019-03-05 04:20:48,321 - remoteaccount
> -
> > > _log - lineno:160]: vagrant@worker2: Running ssh command:
> > > /opt/kafka-dev/bin/kafka-reassign-partitions.sh --zookeeper
> > > worker1:2181 --cancel[DEBUG - 2019-03-05 04:20:57,452 - kafka -
> > > execute_reassign_cancel - lineno:520]: Verify cancel / rollback
> pending
> > > partition reassignment:Rolling back the current pending reassignments
> > > Map(test_topic-7 -> Map(replicas -> Buffer(3, 2, 5), original_replicas
> > > -> Buffer(3, 2, 4)), test_topic-18 -> Map(replicas -> Buffer(5, 1, 2),
> > > original_replicas -> Buffer(4, 1, 2)), test_topic-3 -> Map(replicas ->
> > > Buffer(5, 2, 3), original_replicas -> Buffer(4, 2, 3)), test_topic-15
> > > -> Map(replicas -> Buffer(1, 3, 5), original_replicas -> Buffer(1, 3,
> > > 4)), test_topic-11 -> Map(replicas -> Buffer(2, 3, 5),
> > > original_replicas -> Buffer(2, 3, 4)))Successfully submitted
> > > cancellation of reassignments.The cancelled pending reassignments
> > > throttle was removed.Please run --verify to have the previous
> > > reassignments (not just the cancelled reassignments in progress)
> > > throttle removed.
> > >
> > >
> > >
> > >
> > >    On Tuesday, March 5, 2019, 8:20:25 AM PST, Viktor Somogyi-Vass
> > > <viktorsomo...@gmail.com> wrote:
> > >
> > >  Hey George,
> > > Sorry for the delay.I'll answer point-by-point:1.2: I think it's fine.
> > > As you say we presume that the client knows the state of the cluster
> > > before doing the reassignment, so we can presume the same for
> > > cancellation.2.: One follow-up question: if the reassignment
> > > cancellation gets interrupted and a failover happens after step #2 but
> > > before step #3, how will the new controller continue? At this stage
> > > Zookeeper would contain OAR + RAR, however the brokers will have the
> > > updated LeaderAndIsr about OAR, so they won't know about RAR. I would
> > > suppose the new controller would start from the beginning as it only
> > > knows what's in Zookeeper. Is that true?2.1: Another interesting
> > > question that are what are those replicas are doing which are online
> > > but not part of the leader and ISR? Are they still replicating? Are
> > > they safe to lie around for the time being?
> > > Best,
> > > Viktor
> > >
> > > On Fri, Mar 1, 2019 at 8:40 PM George Li <sql_consult...@yahoo.com>
> wrote:
> > >
> > >  Hi Jun,
> > > Could you help review KIP-236 when you have time?  Thanks.
> > >
> > >
> > > Hi Becket,
> > > Since you filed https://issues.apache.org/jira/browse/KAFKA-6304 to
> > > request this feature.  Could you also help review and comment on
> > > KIP-236 ?  Thanks.
> > >
> > >
> > >
> > > Hi Viktor,
> > > I have updated https://github.com/apache/kafka/pull/6296  and also
> > > KIP-236:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
>  Please
> see below new section "Skip Reassignment Cancellation Scenarios":
> > >
> > > Skip Reassignment Cancellation Scenarios
> > >
> > > There are a couple scenarios that the Pending reassignments
> > > in /admin/reassign_partitions can not be cancelled / rollback.
> > >
> > >    - If the "original_replicas"  is missing for the topic/partition
> > > in /admin/reassign_partitions .  In this case, the pending
> reassignment
> > > cancelled will be skipped.  Because there is no way to reset to the
> > > original replicas.  The reasons this can happened  could be:
> > >      - if either the user/client is
> > > tampering /admin/reassign_partitions directly, and does not have the
> > > "original_replicas" for the topic
> > >      - if the user/client is using incorrect versions of the admin
> > > client to submit for reassignments.   The Kafka software should be
> > > upgraded not just for all the brokers in the cluster.  but also on the
> > > host that is used to submit reassignments.
> > >
> > >
> > >
> > >    - If all the "original_replicas" brokers are not in ISR,  and some
> > > brokers in the "new_replicas" are not offline for the topic/partition
> > > in the pending reassignments.   In this case, it's better to skip this
> > > topic's pending reassignment  cancellation/rollback,  otherwise, it
> > > will become offline.  However,  if all the brokers in
> > > "original_replicas" are offline  AND  all the brokers in
> "new_replicas"
> > > are also offline for this topic/partition,  then the cluster is in
> such
> > > a bad state, the topic/partition is currently offline anyway,  it will
> > > cancel/rollback this topic pending reassignments back to the
> > > "original_replicas".
> > >
> > >
> > >
> > >
> > >
> > > What other scenarios others can think of that reassignment
> cancellation
> > > should be skipped?  Thanks
> > >
> > >
> > > Hi All,
> > >
> > > Another issues I would like to raise is the removing of throttle for
> > > the Cancelled Reassignments.  Currently the remove throttle code is in
> > > the Admin Client.  Since the pending reassignments are cancelled
> > > /rollback,  the throttle would not be removed by running admin client
> > > with --verify option to remove the throttle.  My approached is to
> > > remove the throttle in the admin client after the reassignments
> > > cancellation.  But I feel it's better to move this in Controller (in
> > > controller failover scenario).
> > >
> > >
> > > Thanks,
> > > George
> > >
> > >
> > >
> > >    On Monday, February 25, 2019, 11:40:08 AM PST, George Li
> > > <sql_consult...@yahoo.com> wrote:
> > >
> > >  Hi Viktor,
> > > Thanks for the response.  Good questions!  answers below:
> > > > A few questions regarding the rollback algorithm:> 1. At step 2 how
> do you elect the leader?
> > >
> > > The step 2 code is in Enable pending reassignments
> > > cancellation/rollback by sql888 · Pull Request #6296 ·
> > >
> apache/kafka  core/src/main/scala/kafka/controller/KafkaController.scala 
> line#622
> > >
> > > |
> > > |
> > > |
> > > |  |  |
> > >
> > >  |
> > >
> > >  |
> > > |
> > > |  |
> > > Enable pending reassignments cancellation/rollback by sql888 · Pull
> Requ...
> > >
> > > Enable pending reassignments (reassignments still in-flight in
> > > /admin/reassign_partitions) cancellation/rollback...
> > >  |
> > >
> > >  |
> > >
> > >  |
> > >
> > >
> > >
> > > rollbackReassignedPartitionLeaderIfRequired(topicPartition,
> > > reassignedPartitionContext)During "pending" reassignment, e.g.
> (1,2,3)
> > > => (4,2,5)  normally, the leader (in this case broker_id 1) will
> remain
> > > as the leader until all replicas (1,2,3,4,5) in ISR, then the leader
> > > will be switched to 4.  However, in one scenario, if let's say new
> > > replica 4 is already caught up in ISR, and somehow original leader 1
> is
> > > down or bounced.  4 could become the new
> > > leader. rollbackReassignedPartitionLeaderIfRequired() will do a
> > > leadership election using
> > > PreferredReplicaPartitionLeaderElectionStrategy
> > >  among brokers in OAR (Original Assigned Replicas set in memory). >
> > > 1.1. Would it be always the original leader?
> > > Not necessarily,  if the original preferred leader is down, it can be
> > > other brokers in OAR which are in ISR.
> > > > 1.2. What if some brokers that are in OAR are down?
> > > If some brokers in OAR are down, the topic/partition will have URP
> > > (Under Replicated Partition). The client deciding to do reassignment
> > > should be clear what the current state of the cluster is, what brokers
> > > are down, what are up, what reassignment is trying to accomplish. e.g.
> > > reassignment from down brokers to new brokers(?)
> > >
> > > > 2. I still have doubts that we need to do the reassignment backwards
> during rollback. For instance if we decide to cancel the reassignment at
> step > #8 where replicas in OAR - RAR are offline and start the rollback,
> then how do we make a replica from OAR online again before electing a
> leader as described in step #2 of the rollback algorithm?> 3. Does the
> algorithm defend against crashes? Is it able to continue after a controller
> failover?
> > > > 4. I think it would be a good addition if you could add few example
> scenarios for rollback.
> > >
> > > yes. shouldTriggerReassignCancelOnControllerStartup() is the
> > > integration test to simulate controller failover while cancelling
> > > pending reassignments.  I will try to add controller failover scenario
> > > in a ducktape system test.
> > > You do raise a good point here. If the cluster is in a very BAD
> shape,
> > > e.g. None of the OAR brokers are online,  but some new broker in RAR
> is
> > > in ISR and is current leader, it make senses not to rollback to keep
> > > that topic/partition online. However, if none of brokers in  RAR is
> > > online, it may make sense to rollback to OAR and remove it from
> > > /admin/reassign_partitions, since the cluster state is already so bad,
> > > that topic/partition is offline anyway no matter rollback or not.
> > > I will add a check before cancel/rollback a topic/partition's pending
> > > reassignment by checking whether at least one broker of OAR is in ISR,
> > > so that it can be elected as leader,  if not, skip that
> topic/partition
> > > reassignment cancellation and raise an exception.
> > > I will list a few more scenarios for rollback.
> > > What additional scenarios for rollback you and others can think of?
> > >
> > > Thanks,George
> > >
> > >    On Monday, February 25, 2019, 3:53:33 AM PST, Viktor Somogyi-Vass
> > > <viktorsomo...@gmail.com> wrote:
> > >
> > >  Hey George,
> > > Thanks for the prompt response, it makes sense. I'll try to keep your
> > > code changes on top of my list and help reviewing that. :)Regarding
> the
> > > incremental reassignment: I don't mind either to discuss it as part of
> > > this or in a separate conversation but I think a separate one could be
> > > better because both discussions can be long and keeping them separated
> > > would limit the scope and make them more digestible and focused. If
> the
> > > community decides to discuss it here then I think I'll put KIP-435 on
> > > hold or rejected and add my ideas here. If the community decides to
> > > discuss it in a different KIP I think it's a good idea to move the
> > > planned future work part into KIP-435 and rework that. Maybe we can
> > > co-author it as I think both works could be complementary to each
> > > other. In any case I'd be absolutely interested in what others think.
> > > A few questions regarding the rollback algorithm:1. At step 2 how do
> > > you elect the leader? 1.1. Would it be always the original
> leader? 1.2.
> > > What if some brokers that are in OAR are down?2. I still have doubts
> > > that we need to do the reassignment backwards during rollback. For
> > > instance if we decide to cancel the reassignment at step #8 where
> > > replicas in OAR - RAR are offline and start the rollback, then how do
> > > we make a replica from OAR online again before electing a leader as
> > > described in step #2 of the rollback algorithm?3. Does the algorithm
> > > defend against crashes? Is it able to continue after a controller
> > > failover?4. I think it would be a good addition if you could add few
> > > example scenarios for rollback.
> > > Best,
> > > Viktor
> > >
> > > On Fri, Feb 22, 2019 at 7:04 PM George Li <sql_consult...@yahoo.com>
> wrote:
> > >
> > >  Hi Viktor,
> > >
> > > Thanks for reading and provide feedbacks on KIP-236.
> > >
> > > For reassignments, one can generate a json for new assignments and
> > > another json with "original" assignments for rollback purpose.  In
> > > production cluster, from our experience, we need to submit the
> > > reassignments in batches with throttle/staggering to minimize the
> > > impact to the cluster.  Some large topic/partition couple with
> throttle
> > > can take pretty long time for the new replica to be in ISR to complete
> > > reassignment in that batch. Currently during this,  Kafka does not
> > > allow cancelling the pending reassignments cleanly.  Even you have the
> > > json with the "original" assignments to rollback, it has to wait till
> > > current reassignment to complete, then submit it as reassignments to
> > > rollback. If the current reassignment is causing impact to production,
> > > we would like the reassignments to be cancelled/rollbacked
> > > cleanly/safely/quickly.  This is the main goal of KIP-236.
> > >
> > > The original KIP-236 by Tom Bentley also proposed the incremental
> > > reassignments, to submit new reassignments while the current
> > > reassignments is still going on. This is scaled back to put under
> > > "Planned Future Changes" section of KIP-236, so we can expedite this
> > > Reassignment Cancellation/Rollback feature out to the community.
> > >
> > > The main idea incremental reassignment is to allow submit new
> > > reassignments in another ZK node /admin/reassign_partitions_queue  and
> > > merge it with current pending reassignments in
> > > /admin/reassign_partitions.  In case of same topic/partition in both
> ZK
> > > node, the conflict resolution is to cancel the current reassignment in
> > > /admin/reassign_partitions, and move the same topic/partition
> > > from /admin/reassign_partitions_queue  as new reassignment.
> > >
> > > If there is enough interest from the community, this "Planned Future
> > > Changes" for incremental reassignments can also be delivered in
> > > KIP-236, otherwise, another KIP.  The current
> > > PR:   https://github.com/apache/kafka/pull/6296  only
> focuses/addresses
> > > the pending Reassignment Cancellation/Rollback.
> > >
> > > Hope this answers your questions.
> > >
> > > Thanks,George
> > >
> > >    On Friday, February 22, 2019, 6:51:14 AM PST, Viktor Somogyi-Vass
> > > <viktorsomo...@gmail.com> wrote:
> > >
> > >  Read through the KIP and I have one comment:
> > >
> > > It seems like it is not looking strictly for cancellation but also
> > > implements rolling back to the original. I think it'd be much simpler
> to
> > > generate a reassignment json on cancellation that contains the original
> > > assignment and start a new partition reassignment completely. This way
> the
> > > reassignment algorithm (whatever it is) could be reused as a whole.
> Did you
> > > consider this or are there any obstacles that prevents doing this?
> > >
> > > Regards,
> > > Viktor
> > >
> > > On Fri, Feb 22, 2019 at 2:24 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> > > wrote:
> > >
> > > > I've published the above mentioned KIP here:
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment
> > > > Will start a discussion about it soon.
> > > >
> > > > On Fri, Feb 22, 2019 at 12:45 PM Viktor Somogyi-Vass <
> > > > viktorsomo...@gmail.com> wrote:
> > > >
> > > >> Hi Folks,
> > > >>
> > > >> I also have a pending active work on the incremental partition
> > > >> reassignment stuff here:
> https://issues.apache.org/jira/browse/KAFKA-6794
> > > >> I think it would be good to cooperate on this to make both work
> > > >> compatible with each other.
> > > >>
> > > >> I'll write up a KIP about this today so it'll be easier to see how
> to fit
> > > >> the two together. Basically in my work I operate on the
> > > >> /admin/reassign_partitions node on a fully compatible way, meaning
> I won't
> > > >> change it just calculate each increment based on that and the
> current state
> > > >> of the ISR set for the partition in reassignment.
> > > >> I hope we could collaborate on this.
> > > >>
> > > >> Viktor
> > > >>
> > > >> On Thu, Feb 21, 2019 at 9:04 PM Harsha <ka...@harsha.io> wrote:
> > > >>
> > > >>> Thanks George. LGTM.
> > > >>> Jun & Tom, Can you please take a look at the updated KIP.
> > > >>> Thanks,
> > > >>> Harsha
> > > >>>
> > > >>> On Wed, Feb 20, 2019, at 12:18 PM, George Li wrote:
> > > >>> > Hi,
> > > >>> >
> > > >>> > After discussing with Tom, Harsha and I are picking up KIP-236 <
> > > >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
> >.
> > > >>> The work focused on safely/cleanly cancel / rollback pending
> reassignments
> > > >>> in a timely fashion. Pull Request #6296 <
> > > >>> https://github.com/apache/kafka/pull/6296> Still working on more
> > > >>> integration/system tests.
> > > >>> >
> > > >>> > Please review and provide feedbacks/suggestions.
> > > >>> >
> > > >>> > Thanks,
> > > >>> > George
> > > >>> >
> > > >>> >
> > > >>> > On Saturday, December 23, 2017, 0:51:13 GMT, Jun Rao <
> j...@confluent.io>
> > > >>> wrote:
> > > >>> >
> > > >>> > Hi, Tom,
> > > >>>
> > > >>> Thanks for the reply.
> > > >>>
> > > >>> 10. That's a good thought. Perhaps it's better to get rid of
> > > >>> /admin/reassignment_requests
> > > >>> too. The window when a controller is not available is small. So,
> we can
> > > >>> just failed the admin client if the controller is not reachable
> after the
> > > >>> timeout.
> > > >>>
> > > >>> 13. With the changes in 10, the old approach is handled through ZK
> > > >>> callback
> > > >>> and the new approach is through Kafka RP The ordering between the
> two
> > > >>> is
> > > >>> kind of arbitrary. Perhaps the ordering can just be based on the
> order
> > > >>> that
> > > >>> the reassignment is added to the controller request queue. From
> there, we
> > > >>> can either do the overriding or the prevention.
> > > >>>
> > > >>> Jun
> > > >>>
> > > >>>
> > > >>> On Fri, Dec 22, 2017 at 7:31 AM, Tom Bentley <
> t.j.bent...@gmail.com>
> > > >>> wrote:
> > > >>>
> > > >>> > Hi Jun,
> > > >>> >
> > > >>> > Thanks for responding, my replies are inline:
> > > >>> >
> > > >>> > 10. You explanation makes sense. My remaining concern is the
> > > >>> additional ZK
> > > >>> > > writes in the proposal. With the proposal, we will need to do
> > > >>> following
> > > >>> > > writes in ZK.
> > > >>> > >
> > > >>> > > a. write new assignment in /admin/reassignment_requests
> > > >>> > >
> > > >>> > > b. write new assignment and additional metadata in
> > > >>> > > /admin/reassignments/$topic/$partition
> > > >>> > >
> > > >>> > > c. write old + new assignment  in /brokers/topics/[topic]
> > > >>> > >
> > > >>> > > d. write new assignment in /brokers/topics/[topic]
> > > >>> > >
> > > >>> > > e. delete /admin/reassignments/$topic/$partition
> > > >>> > >
> > > >>> > > So, there are quite a few ZK writes. I am wondering if it's
> better to
> > > >>> > > consolidate the info in /admin/reassignments/$topic/$partition
> into
> > > >>> > > /brokers/topics/[topic].
> > > >>> > > For example, we can just add some new JSON fields in
> > > >>> > > /brokers/topics/[topic]
> > > >>> > > to remember the new assignment and potentially the original
> replica
> > > >>> count
> > > >>> > > when doing step c. Those fields with then be removed in step
> d. That
> > > >>> way,
> > > >>> > > we can get rid of step b and e, saving 2 ZK writes per
> partition.
> > > >>> > >
> > > >>> >
> > > >>> > This seems like a great idea to me.
> > > >>> >
> > > >>> > It might also be possible to get rid of the
> > > >>> /admin/reassignment_requests
> > > >>> > subtree too. I've not yet published the ideas I have for the
> > > >>> AdminClient
> > > >>> > API for reassigning partitions, but given the existence of such
> an
> > > >>> API, the
> > > >>> > route to starting a reassignment would be the AdminClient, and
> not
> > > >>> > zookeeper. In that case there is no need for
> > > >>> /admin/reassignment_requests
> > > >>> > at all. The only drawback that I can see is that while it's
> currently
> > > >>> > possible to trigger a reassignment even during a controller
> > > >>> > election/failover that would no longer be the case if all
> requests had
> > > >>> to
> > > >>> > go via the controller.
> > > >>> >
> > > >>> >
> > > >>> > > 11. What you described sounds good. We could potentially
> optimize the
> > > >>> > > dropped replicas a bit more. Suppose that assignment [0,1,2]
> is first
> > > >>> > > changed to [1,2,3] and then to [2,3,4]. When initiating the
> second
> > > >>> > > assignment, we may end up dropping replica 3 and only to
> restart it
> > > >>> > again.
> > > >>> > > In this case, we could only drop a replica if it's not going
> to be
> > > >>> added
> > > >>> > > back again.
> > > >>> > >
> > > >>> >
> > > >>> > I had missed that, thank you! I will update the proposed
> algorithm to
> > > >>> > prevent this.
> > > >>> >
> > > >>> >
> > > >>> > > 13. Since this is a corner case, we can either prevent or allow
> > > >>> > overriding
> > > >>> > > with old/new mechanisms. To me, it seems that allowing is
> simpler to
> > > >>> > > implement, the order in /admin/reassignment_requests
> determines the
> > > >>> > > ordering the of override, whether that's initiated by the new
> way or
> > > >>> the
> > > >>> > > old way.
> > > >>> > >
> > > >>> >
> > > >>> > That makes sense except for the corner case where:
> > > >>> >
> > > >>> > * There is no current controller and
> > > >>> > * Writes to both the new and old znodes happen
> > > >>> >
> > > >>> > On election of the new controller, for those partitions with
> both a
> > > >>> > reassignment_request and in /admin/reassign_partitions, we have
> to
> > > >>> decide
> > > >>> > which should win. You could use the modification time, though
> there are
> > > >>> > some very unlikely scenarios where that doesn't work properly,
> for
> > > >>> example
> > > >>> > if both znodes have the same mtime, or the
> /admin/reassign_partitions
> > > >>> was
> > > >>> > updated, but the assignment of the partition wasn't changed,
> like this:
> > > >>> >
> > > >>> > 0. /admin/reassign_partitions has my-topic/42 = [1,2,3]
> > > >>> > 1. Controller stops watching.
> > > >>> > 2. Create /admin/reassignment_requests/request_1234 to change the
> > > >>> > reassignment of partition my-topic/42 = [4,5,6]
> > > >>> > 3. Update /admin/reassign_partitions to add your-topic/12=[7,8,9]
> > > >>> > 4. New controller resumes
> > > >>> >
> > > >>> >
> > > >>> >
> > > >>> > > Thanks,
> > > >>> > >
> > > >>> > > Jun
> > > >>> > >
> > > >>> > > On Tue, Dec 19, 2017 at 2:43 AM, Tom Bentley <
> t.j.bent...@gmail.com>
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > > > Hi Jun,
> > > >>> > > >
> > > >>> > > > 10. Another concern of mine is on consistency with the
> current
> > > >>> pattern.
> > > >>> > > The
> > > >>> > > > > current pattern for change notification based on ZK is (1)
> we
> > > >>> first
> > > >>> > > write
> > > >>> > > > > the actual value in the entity path and then write the
> change
> > > >>> > > > notification
> > > >>> > > > > path, and (2)  the change notification path only includes
> what
> > > >>> entity
> > > >>> > > has
> > > >>> > > > > changed but not the actual changes. If we want to follow
> this
> > > >>> pattern
> > > >>> > > for
> > > >>> > > > > consistency, /admin/reassignment_requests/request_xxx will
> only
> > > >>> have
> > > >>> > > the
> > > >>> > > > > partitions whose reassignment have changed, but not the
> actual
> > > >>> > > > > reassignment.
> > > >>> > > > >
> > > >>> > > >
> > > >>> > > > Ah, I hadn't understood part (2). That means my concern about
> > > >>> > efficiency
> > > >>> > > > with the current pattern is misplaced. There are still some
> > > >>> interesting
> > > >>> > > > differences in semantics, however:
> > > >>> > > >
> > > >>> > > > a) The mechanism currently proposed in KIP-236 means that the
> > > >>> > controller
> > > >>> > > is
> > > >>> > > > the only writer to /admin/reassignments. This means it can
> include
> > > >>> > > > information in these znodes that requesters might not know,
> or
> > > >>> > > information
> > > >>> > > > that's necessary to perform the reassignment but not
> necessary to
> > > >>> > > describe
> > > >>> > > > the request. While this could be handled using the current
> pattern
> > > >>> it
> > > >>> > > would
> > > >>> > > > rely on all  writers to preserve any information added by the
> > > >>> > controller,
> > > >>> > > > which seems complicated and hence fragile.
> > > >>> > > >
> > > >>> > > > b) The current pattern for change notification doesn't cope
> with
> > > >>> > > competing
> > > >>> > > > writers to the entity path: If two processes write to the
> entity
> > > >>> path
> > > >>> > > > before the controller can read it (due to notification) then
> one
> > > >>> set of
> > > >>> > > > updates will be lost.
> > > >>> > > >
> > > >>> > > > c) If a single writing process crashes after writing to the
> entity
> > > >>> > path,
> > > >>> > > > but before writing to the notification path then the write
> will be
> > > >>> > lost.
> > > >>> > > >
> > > >>> > > > I'm actually using point a) in my WIP (see below). Points b)
> and
> > > >>> c) are
> > > >>> > > > obviously edge cases.
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > > 11. Ok. I am not sure that I fully understand the
> description of
> > > >>> that
> > > >>> > > > part.
> > > >>> > > > > Does "assigned" refer to the current assignment? Could you
> also
> > > >>> > > describe
> > > >>> > > > > where the length of the original assignment is stored in
> ZK?
> > > >>> > > > >
> > > >>> > > >
> > > >>> > > > Sorry if the description is not clear. Yes, "assigned"
> referrs to
> > > >>> the
> > > >>> > > > currently assigned replicas (taken from the
> > > >>> > > > ControllerContext.partitionReplicaAssignment). I would store
> the
> > > >>> > length
> > > >>> > > of
> > > >>> > > > the original assignment in the
> > > >>> /admin/reassignments/$topic/$partition
> > > >>> > > > znode
> > > >>> > > > (this is where the point (a) above is useful -- the requester
> > > >>> shouldn't
> > > >>> > > > know that this information is used by the controller).
> > > >>> > > >
> > > >>> > > > I've updated the KIP to make these points clearer.
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs to be
> done
> > > >>> for the
> > > >>> > > > whole
> > > >>> > > > > batch. The reason that I brought this up is for
> consistency. The
> > > >>> KIP
> > > >>> > > > allows
> > > >>> > > > > override when using the new approach. It just seems that
> it's
> > > >>> simpler
> > > >>> > > to
> > > >>> > > > > extend this model when resolving multiple changes between
> the
> > > >>> old and
> > > >>> > > the
> > > >>> > > > > new approach.
> > > >>> > > >
> > > >>> > > >
> > > >>> > > > Ah, I think I've been unclear on this point too. Currently
> the
> > > >>> > > > ReassignPartitionsCommand enforces that you can't change
> > > >>> reassignments,
> > > >>> > > but
> > > >>> > > > this doesn't stop other ZK clients making changes to
> > > >>> > > > /admin/reassign_partitions directly and I believe some Kafka
> users
> > > >>> do
> > > >>> > > > indeed change reassignments in-flight by writing to
> > > >>> > > > /admin/reassign_partitions. What I'm proposing doesn't break
> that
> > > >>> at
> > > >>> > all.
> > > >>> > > > The semantic I've implemented is only that the controller
> only
> > > >>> refuses
> > > >>> > a
> > > >>> > > > reassignment change if there is already one in-flight (i.e.
> in
> > > >>> > > > /admin/reassignments/$topic/$partition) **via the other
> > > >>> mechansim**.
> > > >>> > So
> > > >>> > > if
> > > >>> > > > you're using /admin/reassign_partitions and you change or
> cancel
> > > >>> part
> > > >>> > of
> > > >>> > > it
> > > >>> > > > via /admin/reassign_partitions, that's OK. Likewise if
> you're using
> > > >>> > > > /admin/reassignment_request/request_xxx and you change or
> cancel
> > > >>> part
> > > >>> > of
> > > >>> > > > it
> > > >>> > > > via another /admin/reassignment_request/request_xxx, that's
> OK.What
> > > >>> > you
> > > >>> > > > can't do is change a request that was started via
> > > >>> > > > /admin/reassign_partitions via /admin/reassignment_request/
> > > >>> > request_xxx,
> > > >>> > > or
> > > >>> > > > vice versa.
> > > >>> > > >
> > > >>> > > > What I was thinking of when I replied is the case where, on
> > > >>> controller
> > > >>> > > > failover, /admin/reassign_partitions has been changed and
> > > >>> > > > /admin/reassignment_request/request_xxx created (in the
> period when
> > > >>> > the
> > > >>> > > > new
> > > >>> > > > controller was being elected, for example) with a common
> > > >>> partition. In
> > > >>> > > this
> > > >>> > > > case we should apply a consistent rule to that used when the
> > > >>> > notification
> > > >>> > > > happen in real time. Your suggestion to use the modification
> time
> > > >>> of
> > > >>> > the
> > > >>> > > > znode would work here too (except in the edge case where ZK
> writes
> > > >>> to
> > > >>> > > both
> > > >>> > > > znodes happen within the same clock tick on the ZK server,
> so the
> > > >>> > mtimes
> > > >>> > > > are the same).
> > > >>> > > >
> > > >>> > > > Let me know if you think this is the right semantic and I'll
> try to
> > > >>> > > clarify
> > > >>> > > > the KIP.
> > > >>> > > >
> > > >>> > > > Many thanks,
> > > >>> > > >
> > > >>> > > > Tom
> > > >>> > > >
> > > >>> > > > On 18 December 2017 at 18:12, Jun Rao <j...@confluent.io>
> wrote:
> > > >>> > > >
> > > >>> > > > > Hi, Tom,
> > > >>> > > > >
> > > >>> > > > > Thanks for the reply. A few more followup comments below.
> > > >>> > > > >
> > > >>> > > > > 10. Another concern of mine is on consistency with the
> current
> > > >>> > pattern.
> > > >>> > > > The
> > > >>> > > > > current pattern for change notification based on ZK is (1)
> we
> > > >>> first
> > > >>> > > write
> > > >>> > > > > the actual value in the entity path and then write the
> change
> > > >>> > > > notification
> > > >>> > > > > path, and (2)  the change notification path only includes
> what
> > > >>> entity
> > > >>> > > has
> > > >>> > > > > changed but not the actual changes. If we want to follow
> this
> > > >>> pattern
> > > >>> > > for
> > > >>> > > > > consistency, /admin/reassignment_requests/request_xxx will
> only
> > > >>> have
> > > >>> > > the
> > > >>> > > > > partitions whose reassignment have changed, but not the
> actual
> > > >>> > > > > reassignment.
> > > >>> > > > >
> > > >>> > > > > 11. Ok. I am not sure that I fully understand the
> description of
> > > >>> that
> > > >>> > > > part.
> > > >>> > > > > Does "assigned" refer to the current assignment? Could you
> also
> > > >>> > > describe
> > > >>> > > > > where the length of the original assignment is stored in
> ZK?
> > > >>> > > > >
> > > >>> > > > > 13. Hmm, I am not sure that the cancellation needs to be
> done
> > > >>> for the
> > > >>> > > > whole
> > > >>> > > > > batch. The reason that I brought this up is for
> consistency. The
> > > >>> KIP
> > > >>> > > > allows
> > > >>> > > > > override when using the new approach. It just seems that
> it's
> > > >>> simpler
> > > >>> > > to
> > > >>> > > > > extend this model when resolving multiple changes between
> the
> > > >>> old and
> > > >>> > > the
> > > >>> > > > > new approach.
> > > >>> > > > >
> > > >>> > > > > Jun
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > >
> > > >>> > > > > On Mon, Dec 18, 2017 at 2:45 AM, Tom Bentley <
> > > >>> t.j.bent...@gmail.com>
> > > >>> > > > > wrote:
> > > >>> > > > >
> > > >>> > > > > > Hi Jun,
> > > >>> > > > > >
> > > >>> > > > > > Thanks for replying, some answers below:
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > > 10. The proposal now stores the reassignment for all
> > > >>> partitions
> > > >>> > in
> > > >>> > > > > > > /admin/reassignment_requests/request_xxx. If the
> number of
> > > >>> > > > reassigned
> > > >>> > > > > > > partitions is larger, the ZK write may hit the default
> 1MB
> > > >>> limit
> > > >>> > > and
> > > >>> > > > > > fail.
> > > >>> > > > > > > An alternative approach is to have the reassignment
> requester
> > > >>> > first
> > > >>> > > > > write
> > > >>> > > > > > > the new assignment for each partition under
> > > >>> > > > > > > /admin/reassignments/$topic/$partition and then write
> > > >>> > > > > > > /admin/reassignment_requests/request_xxx with an empty
> value.
> > > >>> > The
> > > >>> > > > > > > controller can then read all values under
> > > >>> /admin/reassignments.
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > > > You're right that reassigning enough partitions would
> hit the
> > > >>> 1MB
> > > >>> > > > limit,
> > > >>> > > > > > but I don't think this would be a problem in practice
> because
> > > >>> it
> > > >>> > > would
> > > >>> > > > be
> > > >>> > > > > > trivial to split the partitions into several requests
> (i.e.
> > > >>> > mutleiple
> > > >>> > > > > > request_xxx).
> > > >>> > > > > > I don't think the non-atomicity this would imply is a
> problem.
> > > >>> By
> > > >>> > > > writing
> > > >>> > > > > > the partitions whose
> /admin/reassignments/$topic/$partition has
> > > >>> > been
> > > >>> > > > > > created or changed it makes it much more efficient to
> know
> > > >>> which of
> > > >>> > > > those
> > > >>> > > > > > znodes we need to read. If I understand your suggestion,
> you
> > > >>> would
> > > >>> > > have
> > > >>> > > > > to
> > > >>> > > > > > read every node under /admin/reassignments to figure out
> which
> > > >>> had
> > > >>> > > > > changed.
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > > 11. The improvement you suggested in
> > > >>> onPartitionReassignment()
> > > >>> > > sounds
> > > >>> > > > > > good
> > > >>> > > > > > > at the high level. The computation of those dropped
> > > >>> partitions
> > > >>> > > seems
> > > >>> > > > a
> > > >>> > > > > > bit
> > > >>> > > > > > > complicated. Perhaps a simple approach is to drop the
> > > >>> replicas
> > > >>> > not
> > > >>> > > in
> > > >>> > > > > the
> > > >>> > > > > > > original assignment and newest reassignment?
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > > > This was what I came up with originally too, but when I
> looked
> > > >>> into
> > > >>> > > > > > implementing it I found a couple of things which made me
> > > >>> reconsider
> > > >>> > > it.
> > > >>> > > > > > Consider the reassignments [0,1] -> [2,3] -> [3,4]. In
> words:
> > > >>> we
> > > >>> > > start
> > > >>> > > > > > reassigning to [2,3], but then change our minds about 2
> and
> > > >>> switch
> > > >>> > it
> > > >>> > > > to
> > > >>> > > > > 4
> > > >>> > > > > > (maybe we've figured out a better overall balance). At
> that
> > > >>> point
> > > >>> > it
> > > >>> > > is
> > > >>> > > > > > perfectly possible that broker 2 is in-sync and broker 1
> is not
> > > >>> > > > in-sync.
> > > >>> > > > > It
> > > >>> > > > > > seems silly to drop broker 2 in favour of broker 1: We're
> > > >>> > needlessly
> > > >>> > > > > giving
> > > >>> > > > > > the cluster more work to do.
> > > >>> > > > > >
> > > >>> > > > > > The second thing that made me reconsider was in that same
> > > >>> scenario
> > > >>> > > it's
> > > >>> > > > > > even possible that broker 2 is the leader of the
> partition.
> > > >>> > Obviously
> > > >>> > > > we
> > > >>> > > > > > can elect a new leader before dropping it, but not
> without
> > > >>> causing
> > > >>> > > > > > disruption to producers and consumers.
> > > >>> > > > > >
> > > >>> > > > > > By accepting a little more complexity in choosing which
> > > >>> brokers to
> > > >>> > > drop
> > > >>> > > > > we
> > > >>> > > > > > make the dropping simpler (no need for leader election)
> and
> > > >>> ensure
> > > >>> > > the
> > > >>> > > > > > cluster has less work to do.
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > > 12. You brought up the need of remembering the original
> > > >>> > assignment.
> > > >>> > > > > This
> > > >>> > > > > > > will be lost if the assignment is changed multiple
> times if
> > > >>> we
> > > >>> > > follow
> > > >>> > > > > the
> > > >>> > > > > > > approach described in 10. One way is to store the
> original
> > > >>> > > assignment
> > > >>> > > > > in
> > > >>> > > > > > > /brokers/topics/[topic] as the following. When the
> final
> > > >>> > > reassignment
> > > >>> > > > > > > completes, we can remove the original field.
> > > >>> > > > > > > {
> > > >>> > > > > > >  "version": 1,
> > > >>> > > > > > >  "partitions": {"0": [0, 1, 3] },
> > > >>> > > > > > >  "originals": {"0": [0, 1, 2] }
> > > >>> > > > > > > }
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > > > While I was implementing my first version of
> > > >>> > > onPartitionReassignment(),
> > > >>> > > > > > where I preferred the originals, I was storing the
> originals
> > > >>> in the
> > > >>> > > > > > /admin/reassignments/$topic/$partition znodes. Since we
> will
> > > >>> > remove
> > > >>> > > > that
> > > >>> > > > > > znode at the end of reassignment anyway, I would suggest
> this
> > > >>> is a
> > > >>> > > > better
> > > >>> > > > > > place to store that data (if it's necessary to do so),
> so that
> > > >>> we
> > > >>> > can
> > > >>> > > > > avoid
> > > >>> > > > > > another ZK round trip.
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > > 13. For resolving the conflict between
> > > >>> /admin/reassign_partitions
> > > >>> > > and
> > > >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps it's
> more
> > > >>> > natural
> > > >>> > > to
> > > >>> > > > > > just
> > > >>> > > > > > > let the assignment with a newer timestamp to override
> the
> > > >>> older
> > > >>> > > one?
> > > >>> > > > > > >
> > > >>> > > > > >
> > > >>> > > > > > That would work but with slightly different semantics to
> what I
> > > >>> > have:
> > > >>> > > > > Since
> > > >>> > > > > > /admin/reassign_partitions contains multiple partitions,
> using
> > > >>> the
> > > >>> > > > > > timestamp means the whole batch wins or losses. By
> tracking how
> > > >>> > each
> > > >>> > > > > > request was made we can be more fine-grained. I'm to use
> the
> > > >>> > > > modification
> > > >>> > > > > > time if such granularity is not required.
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > > 14. Implementation wise, currently, we register a
> watcher of
> > > >>> the
> > > >>> > > isr
> > > >>> > > > > path
> > > >>> > > > > > > of each partition being reassigned. This has the
> potential
> > > >>> issue
> > > >>> > of
> > > >>> > > > > > > registering many listeners. An improvement could be
> just
> > > >>> > > piggybacking
> > > >>> > > > > on
> > > >>> > > > > > > the existing IsrChangeNotificationHandler, which only
> > > >>> watches a
> > > >>> > > > single
> > > >>> > > > > ZK
> > > >>> > > > > > > path and is triggered on a batch of isr changes. This
> is
> > > >>> kind of
> > > >>> > > > > > orthogonal
> > > >>> > > > > > > to the KIP. However, if we are touching the
> reassignment
> > > >>> logic,
> > > >>> > it
> > > >>> > > > may
> > > >>> > > > > be
> > > >>> > > > > > > worth considering.
> > > >>> > > > > >
> > > >>> > > > > >
> > > >>> > > > > > Let me look into that.
> > > >>> > > > > >
> > > >>> > > > > > Thanks,
> > > >>> > > > > >
> > > >>> > > > > > Tom
> > > >>> > > > > >
> > > >>> > > > > > On 16 December 2017 at 02:19, Jun Rao <j...@confluent.io>
> > > >>> wrote:
> > > >>> > > > > >
> > > >>> > > > > > > Hi, Tom,
> > > >>> > > > > > >
> > > >>> > > > > > > Thanks for the updated KIP. A few more comments below.
> > > >>> > > > > > >
> > > >>> > > > > > > 10. The proposal now stores the reassignment for all
> > > >>> partitions
> > > >>> > in
> > > >>> > > > > > > /admin/reassignment_requests/request_xxx. If the
> number of
> > > >>> > > > reassigned
> > > >>> > > > > > > partitions is larger, the ZK write may hit the default
> 1MB
> > > >>> limit
> > > >>> > > and
> > > >>> > > > > > fail.
> > > >>> > > > > > > An alternative approach is to have the reassignment
> requester
> > > >>> > first
> > > >>> > > > > write
> > > >>> > > > > > > the new assignment for each partition under
> > > >>> > > > > > > /admin/reassignments/$topic/$partition and then write
> > > >>> > > > > > > /admin/reassignment_requests/request_xxx with an empty
> value.
> > > >>> > The
> > > >>> > > > > > > controller can then read all values under
> > > >>> /admin/reassignments.
> > > >>> > > > > > >
> > > >>> > > > > > > 11. The improvement you suggested in
> > > >>> onPartitionReassignment()
> > > >>> > > sounds
> > > >>> > > > > > good
> > > >>> > > > > > > at the high level. The computation of those dropped
> > > >>> partitions
> > > >>> > > seems
> > > >>> > > > a
> > > >>> > > > > > bit
> > > >>> > > > > > > complicated. Perhaps a simple approach is to drop the
> > > >>> replicas
> > > >>> > not
> > > >>> > > in
> > > >>> > > > > the
> > > >>> > > > > > > original assignment and newest reassignment?
> > > >>> > > > > > >
> > > >>> > > > > > > 12. You brought up the need of remembering the original
> > > >>> > assignment.
> > > >>> > > > > This
> > > >>> > > > > > > will be lost if the assignment is changed multiple
> times if
> > > >>> we
> > > >>> > > follow
> > > >>> > > > > the
> > > >>> > > > > > > approach described in 10. One way is to store the
> original
> > > >>> > > assignment
> > > >>> > > > > in
> > > >>> > > > > > > /brokers/topics/[topic] as the following. When the
> final
> > > >>> > > reassignment
> > > >>> > > > > > > completes, we can remove the original field.
> > > >>> > > > > > > {
> > > >>> > > > > > >  "version": 1,
> > > >>> > > > > > >  "partitions": {"0": [0, 1, 3] },
> > > >>> > > > > > >  "originals": {"0": [0, 1, 2] }
> > > >>> > > > > > > }
> > > >>> > > > > > >
> > > >>> > > > > > > 13. For resolving the conflict between
> > > >>> /admin/reassign_partitions
> > > >>> > > and
> > > >>> > > > > > > /admin/reassignments/$topic/$partition, perhaps it's
> more
> > > >>> > natural
> > > >>> > > to
> > > >>> > > > > > just
> > > >>> > > > > > > let the assignment with a newer timestamp to override
> the
> > > >>> older
> > > >>> > > one?
> > > >>> > > > > > >
> > > >>> > > > > > > 14. Implementation wise, currently, we register a
> watcher of
> > > >>> the
> > > >>> > > isr
> > > >>> > > > > path
> > > >>> > > > > > > of each partition being reassigned. This has the
> potential
> > > >>> issue
> > > >>> > of
> > > >>> > > > > > > registering many listeners. An improvement could be
> just
> > > >>> > > piggybacking
> > > >>> > > > > on
> > > >>> > > > > > > the existing IsrChangeNotificationHandler, which only
> > > >>> watches a
> > > >>> > > > single
> > > >>> > > > > ZK
> > > >>> > > > > > > path and is triggered on a batch of isr changes. This
> is
> > > >>> kind of
> > > >>> > > > > > orthogonal
> > > >>> > > > > > > to the KIP. However, if we are touching the
> reassignment
> > > >>> logic,
> > > >>> > it
> > > >>> > > > may
> > > >>> > > > > be
> > > >>> > > > > > > worth considering.
> > > >>> > > > > > >
> > > >>> > > > > > > Thanks,
> > > >>> > > > > > >
> > > >>> > > > > > > Jun
> > > >>> > > > > > >
> > > >>> > > > > > > On Fri, Dec 15, 2017 at 10:17 AM, Tom Bentley <
> > > >>> > > t.j.bent...@gmail.com
> > > >>> > > > >
> > > >>> > > > > > > wrote:
> > > >>> > > > > > >
> > > >>> > > > > > > > Just wanted to mention that I've started KIP-240,
> which
> > > >>> builds
> > > >>> > on
> > > >>> > > > top
> > > >>> > > > > > of
> > > >>> > > > > > > > this one to provide an AdminClient API for listing
> and
> > > >>> > describing
> > > >>> > > > > > > > reassignments.
> > > >>> > > > > > > >
> > > >>> > > > > > > > On 15 December 2017 at 14:34, Tom Bentley <
> > > >>> > t.j.bent...@gmail.com
> > > >>> > > >
> > > >>> > > > > > wrote:
> > > >>> > > > > > > >
> > > >>> > > > > > > > > > Should we seek to improve this algorithm in this
> KIP,
> > > >>> or
> > > >>> > > leave
> > > >>> > > > > that
> > > >>> > > > > > > as
> > > >>> > > > > > > > > a later optimisation?
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > I've updated the KIP with a proposed algorithm.
> > > >>> > > > > > > > >
> > > >>> > > > > > > > >
> > > >>> > > > > > > > >
> > > >>> > > > > > > > >
> > > >>> > > > > > > > >
> > > >>> > > > > > > > >
> > > >>> > > > > > > > > On 14 December 2017 at 09:57, Tom Bentley <
> > > >>> > > t.j.bent...@gmail.com
> > > >>> > > > >
> > > >>> > > > > > > wrote:
> > > >>> > > > > > > > >
> > > >>> > > > > > > > >> Thanks Ted, now fixed.
> > > >>> > > > > > > > >>
> > > >>> > > > > > > > >> On 13 December 2017 at 18:38, Ted Yu <
> > > >>> yuzhih...@gmail.com>
> > > >>> > > > wrote:
> > > >>> > > > > > > > >>
> > > >>> > > > > > > > >>> Tom:
> > > >>> > > > > > > > >>> bq. create a znode
> > > >>> /admin/reassignments/$topic-$partition
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> Looks like the tree structure above should be:
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> /admin/reassignments/$topic/$partition
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> bq. The controller removes
> /admin/reassignment/$topic/$
> > > >>> > > > partition
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> Note the lack of 's' for reassignment. It would
> be
> > > >>> good to
> > > >>> > > make
> > > >>> > > > > > > > zookeeper
> > > >>> > > > > > > > >>> paths consistent.
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> Thanks
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> On Wed, Dec 13, 2017 at 9:49 AM, Tom Bentley <
> > > >>> > > > > > t.j.bent...@gmail.com>
> > > >>> > > > > > > > >>> wrote:
> > > >>> > > > > > > > >>>
> > > >>> > > > > > > > >>> > Hi Jun and Ted,
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > Jun, you're right that needing one watcher per
> > > >>> reassigned
> > > >>> > > > > > partition
> > > >>> > > > > > > > >>> > presents a scalability problem, and using a
> separate
> > > >>> > > > > notification
> > > >>> > > > > > > > path
> > > >>> > > > > > > > >>> > solves that. I also agree that it makes sense
> to
> > > >>> prevent
> > > >>> > > > users
> > > >>> > > > > > from
> > > >>> > > > > > > > >>> using
> > > >>> > > > > > > > >>> > both methods on the same reassignment.
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > Ted, naming the reassignments like mytopic-42
> was
> > > >>> simpler
> > > >>> > > > > while I
> > > >>> > > > > > > was
> > > >>> > > > > > > > >>> > proposing a watcher-per-reassignment (I'd have
> > > >>> needed a
> > > >>> > > child
> > > >>> > > > > > > watcher
> > > >>> > > > > > > > >>> on
> > > >>> > > > > > > > >>> > /admin/reassignments and also on
> > > >>> > > > /admin/reassignments/mytopic).
> > > >>> > > > > > > Using
> > > >>> > > > > > > > >>> the
> > > >>> > > > > > > > >>> > separate notification path means I don't need
> any
> > > >>> > watchers
> > > >>> > > in
> > > >>> > > > > the
> > > >>> > > > > > > > >>> > /admin/reassignments subtree, so switching to
> > > >>> > > > > > > > >>> /admin/reassignments/mytopic/
> > > >>> > > > > > > > >>> > 42
> > > >>> > > > > > > > >>> > would work, and avoid /admin/reassignments
> having a
> > > >>> very
> > > >>> > > > large
> > > >>> > > > > > > number
> > > >>> > > > > > > > >>> of
> > > >>> > > > > > > > >>> > child nodes. On the other hand it also means I
> have
> > > >>> to
> > > >>> > > create
> > > >>> > > > > and
> > > >>> > > > > > > > >>> delete
> > > >>> > > > > > > > >>> > the topic nodes (e.g.
> /admin/reassignments/mytopic),
> > > >>> > which
> > > >>> > > > > incurs
> > > >>> > > > > > > the
> > > >>> > > > > > > > >>> cost
> > > >>> > > > > > > > >>> > of extra round trips to zookeeper. I suppose
> that
> > > >>> since
> > > >>> > > > > > > reassignment
> > > >>> > > > > > > > is
> > > >>> > > > > > > > >>> > generally a slow process it makes little
> difference
> > > >>> if we
> > > >>> > > > > > increase
> > > >>> > > > > > > > the
> > > >>> > > > > > > > >>> > latency of the interactions with zookeeper.
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > I have updated the KIP with these
> improvements, and a
> > > >>> > more
> > > >>> > > > > > detailed
> > > >>> > > > > > > > >>> > description of exactly how we would manage
> these
> > > >>> znodes.
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > Reading the algorithm in KafkaController.
> > > >>> > > > > > > onPartitionReassignment(),
> > > >>> > > > > > > > it
> > > >>> > > > > > > > >>> > seems that it would be suboptimal for changing
> > > >>> > > reassignments
> > > >>> > > > > > > > in-flight.
> > > >>> > > > > > > > >>> > Consider an initial assignment of [1,2],
> reassigned
> > > >>> to
> > > >>> > > [2,3]
> > > >>> > > > > and
> > > >>> > > > > > > then
> > > >>> > > > > > > > >>> > changed to [2,4]. Broker 3 will remain in the
> > > >>> assigned
> > > >>> > > > replicas
> > > >>> > > > > > > until
> > > >>> > > > > > > > >>> > broker 4 is in sync, even though 3 wasn't
> actually
> > > >>> one of
> > > >>> > > the
> > > >>> > > > > > > > original
> > > >>> > > > > > > > >>> > assigned replicas and is no longer a new
> assigned
> > > >>> > replica.
> > > >>> > > I
> > > >>> > > > > > think
> > > >>> > > > > > > > this
> > > >>> > > > > > > > >>> > also affects the case where the reassignment is
> > > >>> cancelled
> > > >>> > > > > > > > >>> > ([1,2]->[2,3]->[1,2]): We again have to wait
> for 3 to
> > > >>> > catch
> > > >>> > > > up,
> > > >>> > > > > > > even
> > > >>> > > > > > > > >>> though
> > > >>> > > > > > > > >>> > its replica will then be deleted.
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > Should we seek to improve this algorithm in
> this
> > > >>> KIP, or
> > > >>> > > > leave
> > > >>> > > > > > that
> > > >>> > > > > > > > as
> > > >>> > > > > > > > >>> a
> > > >>> > > > > > > > >>> > later optimisation?
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > Cheers,
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > Tom
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao <
> > > >>> j...@confluent.io>
> > > >>> > > > > wrote:
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > > Another question is on the compatibility.
> Since now
> > > >>> > there
> > > >>> > > > > are 2
> > > >>> > > > > > > > ways
> > > >>> > > > > > > > >>> of
> > > >>> > > > > > > > >>> > > specifying a partition reassignment, one
> under
> > > >>> > > > > > > > >>> /admin/reassign_partitions
> > > >>> > > > > > > > >>> > > and the other under /admin/reassignments, we
> > > >>> probably
> > > >>> > > want
> > > >>> > > > to
> > > >>> > > > > > > > >>> prevent the
> > > >>> > > > > > > > >>> > > same topic being reassigned under both paths
> at the
> > > >>> > same
> > > >>> > > > > time?
> > > >>> > > > > > > > >>> > > Thanks,
> > > >>> > > > > > > > >>> > >
> > > >>> > > > > > > > >>> > > Jun
> > > >>> > > > > > > > >>> > >
> > > >>> > > > > > > > >>> > >
> > > >>> > > > > > > > >>> > >
> > > >>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao <
> > > >>> > > j...@confluent.io>
> > > >>> > > > > > > wrote:
> > > >>> > > > > > > > >>> > >
> > > >>> > > > > > > > >>> > > > Hi, Tom,
> > > >>> > > > > > > > >>> > > >
> > > >>> > > > > > > > >>> > > > Thanks for the KIP. It definitely
> addresses one
> > > >>> of
> > > >>> > the
> > > >>> > > > pain
> > > >>> > > > > > > > points
> > > >>> > > > > > > > >>> in
> > > >>> > > > > > > > >>> > > > partition reassignment. Another issue that
> it
> > > >>> also
> > > >>> > > > > addresses
> > > >>> > > > > > is
> > > >>> > > > > > > > >>> the ZK
> > > >>> > > > > > > > >>> > > node
> > > >>> > > > > > > > >>> > > > size limit when writing the reassignment
> JSON.
> > > >>> > > > > > > > >>> > > >
> > > >>> > > > > > > > >>> > > > My only concern is that the KIP needs to
> create
> > > >>> one
> > > >>> > > > watcher
> > > >>> > > > > > per
> > > >>> > > > > > > > >>> > > reassigned
> > > >>> > > > > > > > >>> > > > partition. This could add overhead in ZK
> and
> > > >>> > complexity
> > > >>> > > > for
> > > >>> > > > > > > > >>> debugging
> > > >>> > > > > > > > >>> > > when
> > > >>> > > > > > > > >>> > > > lots of partitions are being reassigned
> > > >>> > simultaneously.
> > > >>> > > > We
> > > >>> > > > > > > could
> > > >>> > > > > > > > >>> > > > potentially improve this by introducing a
> > > >>> separate ZK
> > > >>> > > > path
> > > >>> > > > > > for
> > > >>> > > > > > > > >>> change
> > > >>> > > > > > > > >>> > > > notification as we do for configs. For
> example,
> > > >>> every
> > > >>> > > > time
> > > >>> > > > > we
> > > >>> > > > > > > > >>> change
> > > >>> > > > > > > > >>> > the
> > > >>> > > > > > > > >>> > > > assignment for a set of partitions, we
> could
> > > >>> further
> > > >>> > > > write
> > > >>> > > > > a
> > > >>> > > > > > > > >>> sequential
> > > >>> > > > > > > > >>> > > > node
> /admin/reassignment_changes/[change_x]. That
> > > >>> > way,
> > > >>> > > > the
> > > >>> > > > > > > > >>> controller
> > > >>> > > > > > > > >>> > > > only needs to watch the change path. Once a
> > > >>> change is
> > > >>> > > > > > > triggered,
> > > >>> > > > > > > > >>> the
> > > >>> > > > > > > > >>> > > > controller can read everything under
> > > >>> > > > /admin/reassignments/.
> > > >>> > > > > > > > >>> > > >
> > > >>> > > > > > > > >>> > > > Jun
> > > >>> > > > > > > > >>> > > >
> > > >>> > > > > > > > >>> > > >
> > > >>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom
> Bentley <
> > > >>> > > > > > > > t.j.bent...@gmail.com
> > > >>> > > > > > > > >>> >
> > > >>> > > > > > > > >>> > > wrote:
> > > >>> > > > > > > > >>> > > >
> > > >>> > > > > > > > >>> > > >> Hi,
> > > >>> > > > > > > > >>> > > >>
> > > >>> > > > > > > > >>> > > >> This is still very new, but I wanted some
> quick
> > > >>> > > feedback
> > > >>> > > > > on
> > > >>> > > > > > a
> > > >>> > > > > > > > >>> > > preliminary
> > > >>> > > > > > > > >>> > > >> KIP which could, I think, help with
> providing an
> > > >>> > > > > AdminClient
> > > >>> > > > > > > API
> > > >>> > > > > > > > >>> for
> > > >>> > > > > > > > >>> > > >> partition reassignment.
> > > >>> > > > > > > > >>> > > >>
> > > >>> > > > > > > > >>> > > >> https://cwiki.apache.org/
> > > >>> > > confluence/display/KAFKA/KIP-
> > > >>> > > > > 236%
> > > >>> > > > > > > > >>

Reply via email to