Read through the KIP and I have one comment:

It seems like it is not looking strictly for cancellation but also
implements rolling back to the original. I think it'd be much simpler to
generate a reassignment json on cancellation that contains the original
assignment and start a new partition reassignment completely. This way the
reassignment algorithm (whatever it is) could be reused as a whole. Did you
consider this or are there any obstacles that prevents doing this?

Regards,
Viktor

On Fri, Feb 22, 2019 at 2:24 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com>
wrote:

> I've published the above mentioned KIP here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment
> Will start a discussion about it soon.
>
> On Fri, Feb 22, 2019 at 12:45 PM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
>
>> Hi Folks,
>>
>> I also have a pending active work on the incremental partition
>> reassignment stuff here: https://issues.apache.org/jira/browse/KAFKA-6794
>> I think it would be good to cooperate on this to make both work
>> compatible with each other.
>>
>> I'll write up a KIP about this today so it'll be easier to see how to fit
>> the two together. Basically in my work I operate on the
>> /admin/reassign_partitions node on a fully compatible way, meaning I won't
>> change it just calculate each increment based on that and the current state
>> of the ISR set for the partition in reassignment.
>> I hope we could collaborate on this.
>>
>> Viktor
>>
>> On Thu, Feb 21, 2019 at 9:04 PM Harsha <ka...@harsha.io> wrote:
>>
>>> Thanks George. LGTM.
>>> Jun & Tom, Can you please take a look at the updated KIP.
>>> Thanks,
>>> Harsha
>>>
>>> On Wed, Feb 20, 2019, at 12:18 PM, George Li wrote:
>>> > Hi,
>>> >
>>> > After discussing with Tom, Harsha and I are picking up KIP-236 <
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment>.
>>> The work focused on safely/cleanly cancel / rollback pending reassignments
>>> in a timely fashion. Pull Request #6296 <
>>> https://github.com/apache/kafka/pull/6296> Still working on more
>>> integration/system tests.
>>> >
>>> > Please review and provide feedbacks/suggestions.
>>> >
>>> > Thanks,
>>> > George
>>> >
>>> >
>>> > On Saturday, December 23, 2017, 0:51:13 GMT, Jun Rao <j...@confluent.io>
>>> wrote:
>>> >
>>> > Hi, Tom,
>>>
>>> Thanks for the reply.
>>>
>>> 10. That's a good thought. Perhaps it's better to get rid of
>>> /admin/reassignment_requests
>>> too. The window when a controller is not available is small. So, we can
>>> just failed the admin client if the controller is not reachable after the
>>> timeout.
>>>
>>> 13. With the changes in 10, the old approach is handled through ZK
>>> callback
>>> and the new approach is through Kafka RPC. The ordering between the two
>>> is
>>> kind of arbitrary. Perhaps the ordering can just be based on the order
>>> that
>>> the reassignment is added to the controller request queue. From there, we
>>> can either do the overriding or the prevention.
>>>
>>> Jun
>>>
>>>
>>> On Fri, Dec 22, 2017 at 7:31 AM, Tom Bentley <t.j.bent...@gmail.com>
>>> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > Thanks for responding, my replies are inline:
>>> >
>>> > 10. You explanation makes sense. My remaining concern is the
>>> additional ZK
>>> > > writes in the proposal. With the proposal, we will need to do
>>> following
>>> > > writes in ZK.
>>> > >
>>> > > a. write new assignment in /admin/reassignment_requests
>>> > >
>>> > > b. write new assignment and additional metadata in
>>> > > /admin/reassignments/$topic/$partition
>>> > >
>>> > > c. write old + new assignment  in /brokers/topics/[topic]
>>> > >
>>> > > d. write new assignment in /brokers/topics/[topic]
>>> > >
>>> > > e. delete /admin/reassignments/$topic/$partition
>>> > >
>>> > > So, there are quite a few ZK writes. I am wondering if it's better to
>>> > > consolidate the info in /admin/reassignments/$topic/$partition into
>>> > > /brokers/topics/[topic].
>>> > > For example, we can just add some new JSON fields in
>>> > > /brokers/topics/[topic]
>>> > > to remember the new assignment and potentially the original replica
>>> count
>>> > > when doing step c. Those fields with then be removed in step d. That
>>> way,
>>> > > we can get rid of step b and e, saving 2 ZK writes per partition.
>>> > >
>>> >
>>> > This seems like a great idea to me.
>>> >
>>> > It might also be possible to get rid of the
>>> /admin/reassignment_requests
>>> > subtree too. I've not yet published the ideas I have for the
>>> AdminClient
>>> > API for reassigning partitions, but given the existence of such an
>>> API, the
>>> > route to starting a reassignment would be the AdminClient, and not
>>> > zookeeper. In that case there is no need for
>>> /admin/reassignment_requests
>>> > at all. The only drawback that I can see is that while it's currently
>>> > possible to trigger a reassignment even during a controller
>>> > election/failover that would no longer be the case if all requests had
>>> to
>>> > go via the controller.
>>> >
>>> >
>>> > > 11. What you described sounds good. We could potentially optimize the
>>> > > dropped replicas a bit more. Suppose that assignment [0,1,2] is first
>>> > > changed to [1,2,3] and then to [2,3,4]. When initiating the second
>>> > > assignment, we may end up dropping replica 3 and only to restart it
>>> > again.
>>> > > In this case, we could only drop a replica if it's not going to be
>>> added
>>> > > back again.
>>> > >
>>> >
>>> > I had missed that, thank you! I will update the proposed algorithm to
>>> > prevent this.
>>> >
>>> >
>>> > > 13. Since this is a corner case, we can either prevent or allow
>>> > overriding
>>> > > with old/new mechanisms. To me, it seems that allowing is simpler to
>>> > > implement, the order in /admin/reassignment_requests determines the
>>> > > ordering the of override, whether that's initiated by the new way or
>>> the
>>> > > old way.
>>> > >
>>> >
>>> > That makes sense except for the corner case where:
>>> >
>>> > * There is no current controller and
>>> > * Writes to both the new and old znodes happen
>>> >
>>> > On election of the new controller, for those partitions with both a
>>> > reassignment_request and in /admin/reassign_partitions, we have to
>>> decide
>>> > which should win. You could use the modification time, though there are
>>> > some very unlikely scenarios where that doesn't work properly, for
>>> example
>>> > if both znodes have the same mtime, or the /admin/reassign_partitions
>>> was
>>> > updated, but the assignment of the partition wasn't changed, like this:
>>> >
>>> > 0. /admin/reassign_partitions has my-topic/42 = [1,2,3]
>>> > 1. Controller stops watching.
>>> > 2. Create /admin/reassignment_requests/request_1234 to change the
>>> > reassignment of partition my-topic/42 = [4,5,6]
>>> > 3. Update /admin/reassign_partitions to add your-topic/12=[7,8,9]
>>> > 4. New controller resumes
>>> >
>>> >
>>> >
>>> > > Thanks,
>>> > >
>>> > > Jun
>>> > >
>>> > > On Tue, Dec 19, 2017 at 2:43 AM, Tom Bentley <t.j.bent...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi Jun,
>>> > > >
>>> > > > 10. Another concern of mine is on consistency with the current
>>> pattern.
>>> > > The
>>> > > > > current pattern for change notification based on ZK is (1) we
>>> first
>>> > > write
>>> > > > > the actual value in the entity path and then write the change
>>> > > > notification
>>> > > > > path, and (2)  the change notification path only includes what
>>> entity
>>> > > has
>>> > > > > changed but not the actual changes. If we want to follow this
>>> pattern
>>> > > for
>>> > > > > consistency, /admin/reassignment_requests/request_xxx will only
>>> have
>>> > > the
>>> > > > > partitions whose reassignment have changed, but not the actual
>>> > > > > reassignment.
>>> > > > >
>>> > > >
>>> > > > Ah, I hadn't understood part (2). That means my concern about
>>> > efficiency
>>> > > > with the current pattern is misplaced. There are still some
>>> interesting
>>> > > > differences in semantics, however:
>>> > > >
>>> > > > a) The mechanism currently proposed in KIP-236 means that the
>>> > controller
>>> > > is
>>> > > > the only writer to /admin/reassignments. This means it can include
>>> > > > information in these znodes that requesters might not know, or
>>> > > information
>>> > > > that's necessary to perform the reassignment but not necessary to
>>> > > describe
>>> > > > the request. While this could be handled using the current pattern
>>> it
>>> > > would
>>> > > > rely on all  writers to preserve any information added by the
>>> > controller,
>>> > > > which seems complicated and hence fragile.
>>> > > >
>>> > > > b) The current pattern for change notification doesn't cope with
>>> > > competing
>>> > > > writers to the entity path: If two processes write to the entity
>>> path
>>> > > > before the controller can read it (due to notification) then one
>>> set of
>>> > > > updates will be lost.
>>> > > >
>>> > > > c) If a single writing process crashes after writing to the entity
>>> > path,
>>> > > > but before writing to the notification path then the write will be
>>> > lost.
>>> > > >
>>> > > > I'm actually using point a) in my WIP (see below). Points b) and
>>> c) are
>>> > > > obviously edge cases.
>>> > > >
>>> > > >
>>> > > > > 11. Ok. I am not sure that I fully understand the description of
>>> that
>>> > > > part.
>>> > > > > Does "assigned" refer to the current assignment? Could you also
>>> > > describe
>>> > > > > where the length of the original assignment is stored in ZK?
>>> > > > >
>>> > > >
>>> > > > Sorry if the description is not clear. Yes, "assigned" referrs to
>>> the
>>> > > > currently assigned replicas (taken from the
>>> > > > ControllerContext.partitionReplicaAssignment). I would store the
>>> > length
>>> > > of
>>> > > > the original assignment in the
>>> /admin/reassignments/$topic/$partition
>>> > > > znode
>>> > > > (this is where the point (a) above is useful -- the requester
>>> shouldn't
>>> > > > know that this information is used by the controller).
>>> > > >
>>> > > > I've updated the KIP to make these points clearer.
>>> > > >
>>> > > >
>>> > > > > 13. Hmm, I am not sure that the cancellation needs to be done
>>> for the
>>> > > > whole
>>> > > > > batch. The reason that I brought this up is for consistency. The
>>> KIP
>>> > > > allows
>>> > > > > override when using the new approach. It just seems that it's
>>> simpler
>>> > > to
>>> > > > > extend this model when resolving multiple changes between the
>>> old and
>>> > > the
>>> > > > > new approach.
>>> > > >
>>> > > >
>>> > > > Ah, I think I've been unclear on this point too. Currently the
>>> > > > ReassignPartitionsCommand enforces that you can't change
>>> reassignments,
>>> > > but
>>> > > > this doesn't stop other ZK clients making changes to
>>> > > > /admin/reassign_partitions directly and I believe some Kafka users
>>> do
>>> > > > indeed change reassignments in-flight by writing to
>>> > > > /admin/reassign_partitions. What I'm proposing doesn't break that
>>> at
>>> > all.
>>> > > > The semantic I've implemented is only that the controller only
>>> refuses
>>> > a
>>> > > > reassignment change if there is already one in-flight (i.e. in
>>> > > > /admin/reassignments/$topic/$partition) **via the other
>>> mechansim**.
>>> > So
>>> > > if
>>> > > > you're using /admin/reassign_partitions and you change or cancel
>>> part
>>> > of
>>> > > it
>>> > > > via /admin/reassign_partitions, that's OK. Likewise if you're using
>>> > > > /admin/reassignment_request/request_xxx and you change or cancel
>>> part
>>> > of
>>> > > > it
>>> > > > via another /admin/reassignment_request/request_xxx, that's OK.What
>>> > you
>>> > > > can't do is change a request that was started via
>>> > > > /admin/reassign_partitions via /admin/reassignment_request/
>>> > request_xxx,
>>> > > or
>>> > > > vice versa.
>>> > > >
>>> > > > What I was thinking of when I replied is the case where, on
>>> controller
>>> > > > failover, /admin/reassign_partitions has been changed and
>>> > > > /admin/reassignment_request/request_xxx created (in the period when
>>> > the
>>> > > > new
>>> > > > controller was being elected, for example) with a common
>>> partition. In
>>> > > this
>>> > > > case we should apply a consistent rule to that used when the
>>> > notification
>>> > > > happen in real time. Your suggestion to use the modification time
>>> of
>>> > the
>>> > > > znode would work here too (except in the edge case where ZK writes
>>> to
>>> > > both
>>> > > > znodes happen within the same clock tick on the ZK server, so the
>>> > mtimes
>>> > > > are the same).
>>> > > >
>>> > > > Let me know if you think this is the right semantic and I'll try to
>>> > > clarify
>>> > > > the KIP.
>>> > > >
>>> > > > Many thanks,
>>> > > >
>>> > > > Tom
>>> > > >
>>> > > > On 18 December 2017 at 18:12, Jun Rao <j...@confluent.io> wrote:
>>> > > >
>>> > > > > Hi, Tom,
>>> > > > >
>>> > > > > Thanks for the reply. A few more followup comments below.
>>> > > > >
>>> > > > > 10. Another concern of mine is on consistency with the current
>>> > pattern.
>>> > > > The
>>> > > > > current pattern for change notification based on ZK is (1) we
>>> first
>>> > > write
>>> > > > > the actual value in the entity path and then write the change
>>> > > > notification
>>> > > > > path, and (2)  the change notification path only includes what
>>> entity
>>> > > has
>>> > > > > changed but not the actual changes. If we want to follow this
>>> pattern
>>> > > for
>>> > > > > consistency, /admin/reassignment_requests/request_xxx will only
>>> have
>>> > > the
>>> > > > > partitions whose reassignment have changed, but not the actual
>>> > > > > reassignment.
>>> > > > >
>>> > > > > 11. Ok. I am not sure that I fully understand the description of
>>> that
>>> > > > part.
>>> > > > > Does "assigned" refer to the current assignment? Could you also
>>> > > describe
>>> > > > > where the length of the original assignment is stored in ZK?
>>> > > > >
>>> > > > > 13. Hmm, I am not sure that the cancellation needs to be done
>>> for the
>>> > > > whole
>>> > > > > batch. The reason that I brought this up is for consistency. The
>>> KIP
>>> > > > allows
>>> > > > > override when using the new approach. It just seems that it's
>>> simpler
>>> > > to
>>> > > > > extend this model when resolving multiple changes between the
>>> old and
>>> > > the
>>> > > > > new approach.
>>> > > > >
>>> > > > > Jun
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > On Mon, Dec 18, 2017 at 2:45 AM, Tom Bentley <
>>> t.j.bent...@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Hi Jun,
>>> > > > > >
>>> > > > > > Thanks for replying, some answers below:
>>> > > > > >
>>> > > > > >
>>> > > > > > > 10. The proposal now stores the reassignment for all
>>> partitions
>>> > in
>>> > > > > > > /admin/reassignment_requests/request_xxx. If the number of
>>> > > > reassigned
>>> > > > > > > partitions is larger, the ZK write may hit the default 1MB
>>> limit
>>> > > and
>>> > > > > > fail.
>>> > > > > > > An alternative approach is to have the reassignment requester
>>> > first
>>> > > > > write
>>> > > > > > > the new assignment for each partition under
>>> > > > > > > /admin/reassignments/$topic/$partition and then write
>>> > > > > > > /admin/reassignment_requests/request_xxx with an empty value.
>>> > The
>>> > > > > > > controller can then read all values under
>>> /admin/reassignments.
>>> > > > > > >
>>> > > > > >
>>> > > > > > You're right that reassigning enough partitions would hit the
>>> 1MB
>>> > > > limit,
>>> > > > > > but I don't think this would be a problem in practice because
>>> it
>>> > > would
>>> > > > be
>>> > > > > > trivial to split the partitions into several requests (i.e.
>>> > mutleiple
>>> > > > > > request_xxx).
>>> > > > > > I don't think the non-atomicity this would imply is a problem.
>>> By
>>> > > > writing
>>> > > > > > the partitions whose /admin/reassignments/$topic/$partition has
>>> > been
>>> > > > > > created or changed it makes it much more efficient to know
>>> which of
>>> > > > those
>>> > > > > > znodes we need to read. If I understand your suggestion, you
>>> would
>>> > > have
>>> > > > > to
>>> > > > > > read every node under /admin/reassignments to figure out which
>>> had
>>> > > > > changed.
>>> > > > > >
>>> > > > > >
>>> > > > > > > 11. The improvement you suggested in
>>> onPartitionReassignment()
>>> > > sounds
>>> > > > > > good
>>> > > > > > > at the high level. The computation of those dropped
>>> partitions
>>> > > seems
>>> > > > a
>>> > > > > > bit
>>> > > > > > > complicated. Perhaps a simple approach is to drop the
>>> replicas
>>> > not
>>> > > in
>>> > > > > the
>>> > > > > > > original assignment and newest reassignment?
>>> > > > > > >
>>> > > > > >
>>> > > > > > This was what I came up with originally too, but when I looked
>>> into
>>> > > > > > implementing it I found a couple of things which made me
>>> reconsider
>>> > > it.
>>> > > > > > Consider the reassignments [0,1] -> [2,3] -> [3,4]. In words:
>>> we
>>> > > start
>>> > > > > > reassigning to [2,3], but then change our minds about 2 and
>>> switch
>>> > it
>>> > > > to
>>> > > > > 4
>>> > > > > > (maybe we've figured out a better overall balance). At that
>>> point
>>> > it
>>> > > is
>>> > > > > > perfectly possible that broker 2 is in-sync and broker 1 is not
>>> > > > in-sync.
>>> > > > > It
>>> > > > > > seems silly to drop broker 2 in favour of broker 1: We're
>>> > needlessly
>>> > > > > giving
>>> > > > > > the cluster more work to do.
>>> > > > > >
>>> > > > > > The second thing that made me reconsider was in that same
>>> scenario
>>> > > it's
>>> > > > > > even possible that broker 2 is the leader of the partition.
>>> > Obviously
>>> > > > we
>>> > > > > > can elect a new leader before dropping it, but not without
>>> causing
>>> > > > > > disruption to producers and consumers.
>>> > > > > >
>>> > > > > > By accepting a little more complexity in choosing which
>>> brokers to
>>> > > drop
>>> > > > > we
>>> > > > > > make the dropping simpler (no need for leader election) and
>>> ensure
>>> > > the
>>> > > > > > cluster has less work to do.
>>> > > > > >
>>> > > > > >
>>> > > > > > > 12. You brought up the need of remembering the original
>>> > assignment.
>>> > > > > This
>>> > > > > > > will be lost if the assignment is changed multiple times if
>>> we
>>> > > follow
>>> > > > > the
>>> > > > > > > approach described in 10. One way is to store the original
>>> > > assignment
>>> > > > > in
>>> > > > > > > /brokers/topics/[topic] as the following. When the final
>>> > > reassignment
>>> > > > > > > completes, we can remove the original field.
>>> > > > > > > {
>>> > > > > > >   "version": 1,
>>> > > > > > >   "partitions": {"0": [0, 1, 3] },
>>> > > > > > >   "originals": {"0": [0, 1, 2] }
>>> > > > > > > }
>>> > > > > > >
>>> > > > > >
>>> > > > > > While I was implementing my first version of
>>> > > onPartitionReassignment(),
>>> > > > > > where I preferred the originals, I was storing the originals
>>> in the
>>> > > > > > /admin/reassignments/$topic/$partition znodes. Since we will
>>> > remove
>>> > > > that
>>> > > > > > znode at the end of reassignment anyway, I would suggest this
>>> is a
>>> > > > better
>>> > > > > > place to store that data (if it's necessary to do so), so that
>>> we
>>> > can
>>> > > > > avoid
>>> > > > > > another ZK round trip.
>>> > > > > >
>>> > > > > >
>>> > > > > > > 13. For resolving the conflict between
>>> /admin/reassign_partitions
>>> > > and
>>> > > > > > > /admin/reassignments/$topic/$partition, perhaps it's more
>>> > natural
>>> > > to
>>> > > > > > just
>>> > > > > > > let the assignment with a newer timestamp to override the
>>> older
>>> > > one?
>>> > > > > > >
>>> > > > > >
>>> > > > > > That would work but with slightly different semantics to what I
>>> > have:
>>> > > > > Since
>>> > > > > > /admin/reassign_partitions contains multiple partitions, using
>>> the
>>> > > > > > timestamp means the whole batch wins or losses. By tracking how
>>> > each
>>> > > > > > request was made we can be more fine-grained. I'm to use the
>>> > > > modification
>>> > > > > > time if such granularity is not required.
>>> > > > > >
>>> > > > > >
>>> > > > > > > 14. Implementation wise, currently, we register a watcher of
>>> the
>>> > > isr
>>> > > > > path
>>> > > > > > > of each partition being reassigned. This has the potential
>>> issue
>>> > of
>>> > > > > > > registering many listeners. An improvement could be just
>>> > > piggybacking
>>> > > > > on
>>> > > > > > > the existing IsrChangeNotificationHandler, which only
>>> watches a
>>> > > > single
>>> > > > > ZK
>>> > > > > > > path and is triggered on a batch of isr changes. This is
>>> kind of
>>> > > > > > orthogonal
>>> > > > > > > to the KIP. However, if we are touching the reassignment
>>> logic,
>>> > it
>>> > > > may
>>> > > > > be
>>> > > > > > > worth considering.
>>> > > > > >
>>> > > > > >
>>> > > > > > Let me look into that.
>>> > > > > >
>>> > > > > > Thanks,
>>> > > > > >
>>> > > > > > Tom
>>> > > > > >
>>> > > > > > On 16 December 2017 at 02:19, Jun Rao <j...@confluent.io>
>>> wrote:
>>> > > > > >
>>> > > > > > > Hi, Tom,
>>> > > > > > >
>>> > > > > > > Thanks for the updated KIP. A few more comments below.
>>> > > > > > >
>>> > > > > > > 10. The proposal now stores the reassignment for all
>>> partitions
>>> > in
>>> > > > > > > /admin/reassignment_requests/request_xxx. If the number of
>>> > > > reassigned
>>> > > > > > > partitions is larger, the ZK write may hit the default 1MB
>>> limit
>>> > > and
>>> > > > > > fail.
>>> > > > > > > An alternative approach is to have the reassignment requester
>>> > first
>>> > > > > write
>>> > > > > > > the new assignment for each partition under
>>> > > > > > > /admin/reassignments/$topic/$partition and then write
>>> > > > > > > /admin/reassignment_requests/request_xxx with an empty value.
>>> > The
>>> > > > > > > controller can then read all values under
>>> /admin/reassignments.
>>> > > > > > >
>>> > > > > > > 11. The improvement you suggested in
>>> onPartitionReassignment()
>>> > > sounds
>>> > > > > > good
>>> > > > > > > at the high level. The computation of those dropped
>>> partitions
>>> > > seems
>>> > > > a
>>> > > > > > bit
>>> > > > > > > complicated. Perhaps a simple approach is to drop the
>>> replicas
>>> > not
>>> > > in
>>> > > > > the
>>> > > > > > > original assignment and newest reassignment?
>>> > > > > > >
>>> > > > > > > 12. You brought up the need of remembering the original
>>> > assignment.
>>> > > > > This
>>> > > > > > > will be lost if the assignment is changed multiple times if
>>> we
>>> > > follow
>>> > > > > the
>>> > > > > > > approach described in 10. One way is to store the original
>>> > > assignment
>>> > > > > in
>>> > > > > > > /brokers/topics/[topic] as the following. When the final
>>> > > reassignment
>>> > > > > > > completes, we can remove the original field.
>>> > > > > > > {
>>> > > > > > >   "version": 1,
>>> > > > > > >   "partitions": {"0": [0, 1, 3] },
>>> > > > > > >   "originals": {"0": [0, 1, 2] }
>>> > > > > > > }
>>> > > > > > >
>>> > > > > > > 13. For resolving the conflict between
>>> /admin/reassign_partitions
>>> > > and
>>> > > > > > > /admin/reassignments/$topic/$partition, perhaps it's more
>>> > natural
>>> > > to
>>> > > > > > just
>>> > > > > > > let the assignment with a newer timestamp to override the
>>> older
>>> > > one?
>>> > > > > > >
>>> > > > > > > 14. Implementation wise, currently, we register a watcher of
>>> the
>>> > > isr
>>> > > > > path
>>> > > > > > > of each partition being reassigned. This has the potential
>>> issue
>>> > of
>>> > > > > > > registering many listeners. An improvement could be just
>>> > > piggybacking
>>> > > > > on
>>> > > > > > > the existing IsrChangeNotificationHandler, which only
>>> watches a
>>> > > > single
>>> > > > > ZK
>>> > > > > > > path and is triggered on a batch of isr changes. This is
>>> kind of
>>> > > > > > orthogonal
>>> > > > > > > to the KIP. However, if we are touching the reassignment
>>> logic,
>>> > it
>>> > > > may
>>> > > > > be
>>> > > > > > > worth considering.
>>> > > > > > >
>>> > > > > > > Thanks,
>>> > > > > > >
>>> > > > > > > Jun
>>> > > > > > >
>>> > > > > > > On Fri, Dec 15, 2017 at 10:17 AM, Tom Bentley <
>>> > > t.j.bent...@gmail.com
>>> > > > >
>>> > > > > > > wrote:
>>> > > > > > >
>>> > > > > > > > Just wanted to mention that I've started KIP-240, which
>>> builds
>>> > on
>>> > > > top
>>> > > > > > of
>>> > > > > > > > this one to provide an AdminClient API for listing and
>>> > describing
>>> > > > > > > > reassignments.
>>> > > > > > > >
>>> > > > > > > > On 15 December 2017 at 14:34, Tom Bentley <
>>> > t.j.bent...@gmail.com
>>> > > >
>>> > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > > > > Should we seek to improve this algorithm in this KIP,
>>> or
>>> > > leave
>>> > > > > that
>>> > > > > > > as
>>> > > > > > > > > a later optimisation?
>>> > > > > > > > >
>>> > > > > > > > > I've updated the KIP with a proposed algorithm.
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > > > On 14 December 2017 at 09:57, Tom Bentley <
>>> > > t.j.bent...@gmail.com
>>> > > > >
>>> > > > > > > wrote:
>>> > > > > > > > >
>>> > > > > > > > >> Thanks Ted, now fixed.
>>> > > > > > > > >>
>>> > > > > > > > >> On 13 December 2017 at 18:38, Ted Yu <
>>> yuzhih...@gmail.com>
>>> > > > wrote:
>>> > > > > > > > >>
>>> > > > > > > > >>> Tom:
>>> > > > > > > > >>> bq. create a znode
>>> /admin/reassignments/$topic-$partition
>>> > > > > > > > >>>
>>> > > > > > > > >>> Looks like the tree structure above should be:
>>> > > > > > > > >>>
>>> > > > > > > > >>> /admin/reassignments/$topic/$partition
>>> > > > > > > > >>>
>>> > > > > > > > >>> bq. The controller removes /admin/reassignment/$topic/$
>>> > > > partition
>>> > > > > > > > >>>
>>> > > > > > > > >>> Note the lack of 's' for reassignment. It would be
>>> good to
>>> > > make
>>> > > > > > > > zookeeper
>>> > > > > > > > >>> paths consistent.
>>> > > > > > > > >>>
>>> > > > > > > > >>> Thanks
>>> > > > > > > > >>>
>>> > > > > > > > >>> On Wed, Dec 13, 2017 at 9:49 AM, Tom Bentley <
>>> > > > > > t.j.bent...@gmail.com>
>>> > > > > > > > >>> wrote:
>>> > > > > > > > >>>
>>> > > > > > > > >>> > Hi Jun and Ted,
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > Jun, you're right that needing one watcher per
>>> reassigned
>>> > > > > > partition
>>> > > > > > > > >>> > presents a scalability problem, and using a separate
>>> > > > > notification
>>> > > > > > > > path
>>> > > > > > > > >>> > solves that. I also agree that it makes sense to
>>> prevent
>>> > > > users
>>> > > > > > from
>>> > > > > > > > >>> using
>>> > > > > > > > >>> > both methods on the same reassignment.
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > Ted, naming the reassignments like mytopic-42 was
>>> simpler
>>> > > > > while I
>>> > > > > > > was
>>> > > > > > > > >>> > proposing a watcher-per-reassignment (I'd have
>>> needed a
>>> > > child
>>> > > > > > > watcher
>>> > > > > > > > >>> on
>>> > > > > > > > >>> > /admin/reassignments and also on
>>> > > > /admin/reassignments/mytopic).
>>> > > > > > > Using
>>> > > > > > > > >>> the
>>> > > > > > > > >>> > separate notification path means I don't need any
>>> > watchers
>>> > > in
>>> > > > > the
>>> > > > > > > > >>> > /admin/reassignments subtree, so switching to
>>> > > > > > > > >>> /admin/reassignments/mytopic/
>>> > > > > > > > >>> > 42
>>> > > > > > > > >>> > would work, and avoid /admin/reassignments having a
>>> very
>>> > > > large
>>> > > > > > > number
>>> > > > > > > > >>> of
>>> > > > > > > > >>> > child nodes. On the other hand it also means I have
>>> to
>>> > > create
>>> > > > > and
>>> > > > > > > > >>> delete
>>> > > > > > > > >>> > the topic nodes (e.g. /admin/reassignments/mytopic),
>>> > which
>>> > > > > incurs
>>> > > > > > > the
>>> > > > > > > > >>> cost
>>> > > > > > > > >>> > of extra round trips to zookeeper. I suppose that
>>> since
>>> > > > > > > reassignment
>>> > > > > > > > is
>>> > > > > > > > >>> > generally a slow process it makes little difference
>>> if we
>>> > > > > > increase
>>> > > > > > > > the
>>> > > > > > > > >>> > latency of the interactions with zookeeper.
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > I have updated the KIP with these improvements, and a
>>> > more
>>> > > > > > detailed
>>> > > > > > > > >>> > description of exactly how we would manage these
>>> znodes.
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > Reading the algorithm in KafkaController.
>>> > > > > > > onPartitionReassignment(),
>>> > > > > > > > it
>>> > > > > > > > >>> > seems that it would be suboptimal for changing
>>> > > reassignments
>>> > > > > > > > in-flight.
>>> > > > > > > > >>> > Consider an initial assignment of [1,2], reassigned
>>> to
>>> > > [2,3]
>>> > > > > and
>>> > > > > > > then
>>> > > > > > > > >>> > changed to [2,4]. Broker 3 will remain in the
>>> assigned
>>> > > > replicas
>>> > > > > > > until
>>> > > > > > > > >>> > broker 4 is in sync, even though 3 wasn't actually
>>> one of
>>> > > the
>>> > > > > > > > original
>>> > > > > > > > >>> > assigned replicas and is no longer a new assigned
>>> > replica.
>>> > > I
>>> > > > > > think
>>> > > > > > > > this
>>> > > > > > > > >>> > also affects the case where the reassignment is
>>> cancelled
>>> > > > > > > > >>> > ([1,2]->[2,3]->[1,2]): We again have to wait for 3 to
>>> > catch
>>> > > > up,
>>> > > > > > > even
>>> > > > > > > > >>> though
>>> > > > > > > > >>> > its replica will then be deleted.
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > Should we seek to improve this algorithm in this
>>> KIP, or
>>> > > > leave
>>> > > > > > that
>>> > > > > > > > as
>>> > > > > > > > >>> a
>>> > > > > > > > >>> > later optimisation?
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > Cheers,
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > Tom
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao <
>>> j...@confluent.io>
>>> > > > > wrote:
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > > Another question is on the compatibility. Since now
>>> > there
>>> > > > > are 2
>>> > > > > > > > ways
>>> > > > > > > > >>> of
>>> > > > > > > > >>> > > specifying a partition reassignment, one under
>>> > > > > > > > >>> /admin/reassign_partitions
>>> > > > > > > > >>> > > and the other under /admin/reassignments, we
>>> probably
>>> > > want
>>> > > > to
>>> > > > > > > > >>> prevent the
>>> > > > > > > > >>> > > same topic being reassigned under both paths at the
>>> > same
>>> > > > > time?
>>> > > > > > > > >>> > > Thanks,
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > > Jun
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao <
>>> > > j...@confluent.io>
>>> > > > > > > wrote:
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > > > Hi, Tom,
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > Thanks for the KIP. It definitely addresses one
>>> of
>>> > the
>>> > > > pain
>>> > > > > > > > points
>>> > > > > > > > >>> in
>>> > > > > > > > >>> > > > partition reassignment. Another issue that it
>>> also
>>> > > > > addresses
>>> > > > > > is
>>> > > > > > > > >>> the ZK
>>> > > > > > > > >>> > > node
>>> > > > > > > > >>> > > > size limit when writing the reassignment JSON.
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > My only concern is that the KIP needs to create
>>> one
>>> > > > watcher
>>> > > > > > per
>>> > > > > > > > >>> > > reassigned
>>> > > > > > > > >>> > > > partition. This could add overhead in ZK and
>>> > complexity
>>> > > > for
>>> > > > > > > > >>> debugging
>>> > > > > > > > >>> > > when
>>> > > > > > > > >>> > > > lots of partitions are being reassigned
>>> > simultaneously.
>>> > > > We
>>> > > > > > > could
>>> > > > > > > > >>> > > > potentially improve this by introducing a
>>> separate ZK
>>> > > > path
>>> > > > > > for
>>> > > > > > > > >>> change
>>> > > > > > > > >>> > > > notification as we do for configs. For example,
>>> every
>>> > > > time
>>> > > > > we
>>> > > > > > > > >>> change
>>> > > > > > > > >>> > the
>>> > > > > > > > >>> > > > assignment for a set of partitions, we could
>>> further
>>> > > > write
>>> > > > > a
>>> > > > > > > > >>> sequential
>>> > > > > > > > >>> > > > node /admin/reassignment_changes/[change_x]. That
>>> > way,
>>> > > > the
>>> > > > > > > > >>> controller
>>> > > > > > > > >>> > > > only needs to watch the change path. Once a
>>> change is
>>> > > > > > > triggered,
>>> > > > > > > > >>> the
>>> > > > > > > > >>> > > > controller can read everything under
>>> > > > /admin/reassignments/.
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > Jun
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley <
>>> > > > > > > > t.j.bent...@gmail.com
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > > wrote:
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > >> Hi,
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> This is still very new, but I wanted some quick
>>> > > feedback
>>> > > > > on
>>> > > > > > a
>>> > > > > > > > >>> > > preliminary
>>> > > > > > > > >>> > > >> KIP which could, I think, help with providing an
>>> > > > > AdminClient
>>> > > > > > > API
>>> > > > > > > > >>> for
>>> > > > > > > > >>> > > >> partition reassignment.
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> https://cwiki.apache.org/
>>> > > confluence/display/KAFKA/KIP-
>>> > > > > 236%
>>> > > > > > > > >>> > > >> 3A+Interruptible+Partition+Reassignment
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> I wasn't sure whether to start fleshing out a
>>> whole
>>> > > > > > > AdminClient
>>> > > > > > > > >>> API in
>>> > > > > > > > >>> > > >> this
>>> > > > > > > > >>> > > >> KIP (which would make it very big, and
>>> difficult to
>>> > > > read),
>>> > > > > > or
>>> > > > > > > > >>> whether
>>> > > > > > > > >>> > to
>>> > > > > > > > >>> > > >> break it down into smaller KIPs (which makes it
>>> > easier
>>> > > > to
>>> > > > > > read
>>> > > > > > > > and
>>> > > > > > > > >>> > > >> implement in pieces, but harder to get a
>>> high-level
>>> > > > > picture
>>> > > > > > of
>>> > > > > > > > the
>>> > > > > > > > >>> > > >> ultimate
>>> > > > > > > > >>> > > >> destination). For now I've gone for a very small
>>> > > initial
>>> > > > > > KIP,
>>> > > > > > > > but
>>> > > > > > > > >>> I'm
>>> > > > > > > > >>> > > >> happy
>>> > > > > > > > >>> > > >> to sketch the bigger picture here if people are
>>> > > > > interested.
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> Cheers,
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> Tom
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> >
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > On 11 December 2017 at 21:31, Jun Rao <
>>> j...@confluent.io>
>>> > > > > wrote:
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > > Another question is on the compatibility. Since now
>>> > there
>>> > > > > are 2
>>> > > > > > > > ways
>>> > > > > > > > >>> of
>>> > > > > > > > >>> > > specifying a partition reassignment, one under
>>> > > > > > > > >>> /admin/reassign_partitions
>>> > > > > > > > >>> > > and the other under /admin/reassignments, we
>>> probably
>>> > > want
>>> > > > to
>>> > > > > > > > >>> prevent the
>>> > > > > > > > >>> > > same topic being reassigned under both paths at the
>>> > same
>>> > > > > time?
>>> > > > > > > > >>> > > Thanks,
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > > Jun
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao <
>>> > > j...@confluent.io>
>>> > > > > > > wrote:
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> > > > Hi, Tom,
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > Thanks for the KIP. It definitely addresses one
>>> of
>>> > the
>>> > > > pain
>>> > > > > > > > points
>>> > > > > > > > >>> in
>>> > > > > > > > >>> > > > partition reassignment. Another issue that it
>>> also
>>> > > > > addresses
>>> > > > > > is
>>> > > > > > > > >>> the ZK
>>> > > > > > > > >>> > > node
>>> > > > > > > > >>> > > > size limit when writing the reassignment JSON.
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > My only concern is that the KIP needs to create
>>> one
>>> > > > watcher
>>> > > > > > per
>>> > > > > > > > >>> > > reassigned
>>> > > > > > > > >>> > > > partition. This could add overhead in ZK and
>>> > complexity
>>> > > > for
>>> > > > > > > > >>> debugging
>>> > > > > > > > >>> > > when
>>> > > > > > > > >>> > > > lots of partitions are being reassigned
>>> > simultaneously.
>>> > > > We
>>> > > > > > > could
>>> > > > > > > > >>> > > > potentially improve this by introducing a
>>> separate ZK
>>> > > > path
>>> > > > > > for
>>> > > > > > > > >>> change
>>> > > > > > > > >>> > > > notification as we do for configs. For example,
>>> every
>>> > > > time
>>> > > > > we
>>> > > > > > > > >>> change
>>> > > > > > > > >>> > the
>>> > > > > > > > >>> > > > assignment for a set of partitions, we could
>>> further
>>> > > > write
>>> > > > > a
>>> > > > > > > > >>> sequential
>>> > > > > > > > >>> > > > node /admin/reassignment_changes/[change_x]. That
>>> > way,
>>> > > > the
>>> > > > > > > > >>> controller
>>> > > > > > > > >>> > > > only needs to watch the change path. Once a
>>> change is
>>> > > > > > > triggered,
>>> > > > > > > > >>> the
>>> > > > > > > > >>> > > > controller can read everything under
>>> > > > /admin/reassignments/.
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > Jun
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley <
>>> > > > > > > > t.j.bent...@gmail.com
>>> > > > > > > > >>> >
>>> > > > > > > > >>> > > wrote:
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > >> Hi,
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> This is still very new, but I wanted some quick
>>> > > feedback
>>> > > > > on
>>> > > > > > a
>>> > > > > > > > >>> > > preliminary
>>> > > > > > > > >>> > > >> KIP which could, I think, help with providing an
>>> > > > > AdminClient
>>> > > > > > > API
>>> > > > > > > > >>> for
>>> > > > > > > > >>> > > >> partition reassignment.
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> https://cwiki.apache.org/
>>> > > confluence/display/KAFKA/KIP-
>>> > > > > 236%
>>> > > > > > > > >>> > > >> 3A+Interruptible+Partition+Reassignment
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> I wasn't sure whether to start fleshing out a
>>> whole
>>> > > > > > > AdminClient
>>> > > > > > > > >>> API in
>>> > > > > > > > >>> > > >> this
>>> > > > > > > > >>> > > >> KIP (which would make it very big, and
>>> difficult to
>>> > > > read),
>>> > > > > > or
>>> > > > > > > > >>> whether
>>> > > > > > > > >>> > to
>>> > > > > > > > >>> > > >> break it down into smaller KIPs (which makes it
>>> > easier
>>> > > > to
>>> > > > > > read
>>> > > > > > > > and
>>> > > > > > > > >>> > > >> implement in pieces, but harder to get a
>>> high-level
>>> > > > > picture
>>> > > > > > of
>>> > > > > > > > the
>>> > > > > > > > >>> > > >> ultimate
>>> > > > > > > > >>> > > >> destination). For now I've gone for a very small
>>> > > initial
>>> > > > > > KIP,
>>> > > > > > > > but
>>> > > > > > > > >>> I'm
>>> > > > > > > > >>> > > >> happy
>>> > > > > > > > >>> > > >> to sketch the bigger picture here if people are
>>> > > > > interested.
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> Cheers,
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >> Tom
>>> > > > > > > > >>> > > >>
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > > >
>>> > > > > > > > >>> > >
>>> > > > > > > > >>> >
>>> > > > > > > > >>>
>>> > > > > > > > >>
>>> > > > > > > > >>
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> >
>>>
>>

Reply via email to