Hi Folks,

I've updated the KIP with the batching which would work on both replica and
partition level. To explain it briefly: for instance if the replica level
is set to 2 and partition level is set to 3, then 2x3=6 replica
reassignment would be in progress at the same time. In case of reassignment
for a single partition from (0, 1, 2, 3, 4) to (5, 6, 7, 8, 9) we would
form the batches (0, 1) → (5, 6); (2, 3) → (7, 8) and 4 → 9 and would
execute the reassignment in this order.

Let me know what you think.

Best,
Viktor

On Mon, Apr 15, 2019 at 7:01 PM Viktor Somogyi-Vass <viktorsomo...@gmail.com>
wrote:

> A follow up on the batching topic to clarify my points above.
>
> Generally I think that batching should be a core feature as Colin said the
> controller should possess all information that are related.
> Also Cruise Control (or really any 3rd party admin system) might build
> upon this to give more holistic approach to balance brokers. We may cater
> them with APIs that act like building blocks to make their life easier like
> incrementalization, batching, cancellation and rollback but I think the
> more advanced we go we'll need more advanced control surface and Kafka's
> basic tooling might not be suitable for that.
>
> Best,
> Viktor
>
>
> On Mon, 15 Apr 2019, 18:22 Viktor Somogyi-Vass, <viktorsomo...@gmail.com>
> wrote:
>
>> Hey Guys,
>>
>> I'll reply to you all in this email:
>>
>> @Jun:
>> 1. yes, it'd be a good idea to add this feature, I'll write this into the
>> KIP. I was actually thinking about introducing a dynamic config called
>> reassignment.parallel.partition.count and
>> reassignment.parallel.replica.count. The first property would control how
>> many partition reassignment can we do concurrently. The second would go one
>> level in granularity and would control how many replicas do we want to move
>> for a given partition. Also one more thing that'd be useful to fix is that
>> a given list of partition -> replica list would be executed in the same
>> order (from first to last) so it's overall predictable and the user would
>> have some control over the order of reassignments should be specified as
>> the JSON is still assembled by the user.
>> 2. the /kafka/brokers/topics/{topic} znode to be specific. I'll update
>> the KIP to contain this.
>>
>> @Jason:
>> I think building this functionality into Kafka would definitely benefit
>> all the users and that CC as well as it'd simplify their software as you
>> said. As I understand the main advantage of CC and other similar softwares
>> are to give high level features for automatic load balancing. Reliability,
>> stability and predictability of the reassignment should be a core feature
>> of Kafka. I think the incrementalization feature would make it more stable.
>> I would consider cancellation too as a core feature and we can leave the
>> gate open for external tools to feed in their reassignment json as they
>> want. I was also thinking about what are the set of features we can provide
>> for Kafka but I think the more advanced we go the more need there is for an
>> administrative UI component.
>> Regarding KIP-352: Thanks for pointing this out, I didn't see this
>> although lately I was also thinking about the throttling aspect of it.
>> Would be a nice add-on to Kafka since though the above configs provide some
>> level of control, it'd be nice to put an upper cap on the bandwidth and
>> make it monitorable.
>>
>> Viktor
>>
>> On Wed, Apr 10, 2019 at 2:57 AM Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>>> Hi Colin,
>>>
>>> On a related note, what do you think about the idea of storing the
>>> > reassigning replicas in
>>> > /brokers/topics/[topic]/partitions/[partitionId]/state, rather than in
>>> the
>>> > reassignment znode?  I don't think this requires a major change to the
>>> > proposal-- when the controller becomes aware that it should do a
>>> > reassignment, the controller could make the changes.  This also helps
>>> keep
>>> > the reassignment znode from getting larger, which has been a problem.
>>>
>>>
>>> Yeah, I think it's a good idea to store the reassignment state at a finer
>>> level. I'm not sure the LeaderAndIsr znode is the right one though.
>>> Another
>>> option is /brokers/topics/{topic}. That is where we currently store the
>>> replica assignment. I think we basically want to represent both the
>>> current
>>> state and the desired state. This would also open the door to a cleaner
>>> way
>>> to update a reassignment while it is still in progress.
>>>
>>> -Jason
>>>
>>>
>>>
>>>
>>> On Mon, Apr 8, 2019 at 11:14 PM George Li <sql_consult...@yahoo.com
>>> .invalid>
>>> wrote:
>>>
>>> >  Hi Colin / Jason,
>>> >
>>> > Reassignment should really be doing a batches.  I am not too worried
>>> about
>>> > reassignment znode getting larger.  In a real production environment,
>>> too
>>> > many concurrent reassignment and too frequent submission of
>>> reassignments
>>> > seemed to cause latency spikes of kafka cluster.  So
>>> > batching/staggering/throttling of submitting reassignments is
>>> recommended.
>>> >
>>> > In KIP-236,  The "originalReplicas" are only kept for the current
>>> > reassigning partitions (small #), and kept in memory of the controller
>>> > context partitionsBeingReassigned as well as in the znode
>>> > /admin/reassign_partitions,  I think below "setting in the RPC like
>>> null =
>>> > no replicas are reassigning" is a good idea.
>>> >
>>> > There seems to be some issues with the Mail archive server of this
>>> mailing
>>> > list?  I didn't receive email after April 7th, and the archive for
>>> April
>>> > 2019 has only 50 messages (
>>> > http://mail-archives.apache.org/mod_mbox/kafka-dev/201904.mbox/thread)
>>> ?
>>> >
>>> > Thanks,
>>> > George
>>> >
>>> >    on, 08 Apr 2019 17:54:48 GMT  Colin McCabe wrote:
>>> >
>>> >   Yeah, I think adding this information to LeaderAndIsr makes sense.
>>> It
>>> > would be better to track
>>> > "reassigningReplicas" than "originalReplicas", I think.  Tracking
>>> > "originalReplicas" is going
>>> > to involve sending a lot more data, since most replicas in the system
>>> are
>>> > not reassigning
>>> > at any given point.  Or we would need a hack in the RPC like null = no
>>> > replicas are reassigning.
>>> >
>>> > On a related note, what do you think about the idea of storing the
>>> > reassigning replicas in
>>> >  /brokers/topics/[topic]/partitions/[partitionId]/state, rather than in
>>> > the reassignment znode?
>>> >  I don't think this requires a major change to the proposal-- when the
>>> > controller becomes
>>> > aware that it should do a reassignment, the controller could make the
>>> > changes.  This also
>>> > helps keep the reassignment znode from getting larger, which has been a
>>> > problem.
>>> >
>>> > best,
>>> > Colin
>>> >
>>> >
>>> > On Mon, Apr 8, 2019, at 09:29, Jason Gustafson wrote:
>>> > > Hey George,
>>> > >
>>> > > For the URP during a reassignment,  if the "original_replicas" is
>>> kept
>>> > for
>>> > > > the current pending reassignment. I think it will be very easy to
>>> > compare
>>> > > > that with the topic/partition's ISR.  If all "original_replicas"
>>> are in
>>> > > > ISR, then URP should be 0 for that topic/partition.
>>> > >
>>> > >
>>> > > Yeah, that makes sense. But I guess we would need
>>> "original_replicas" to
>>> > be
>>> > > propagated to partition leaders in the LeaderAndIsr request since
>>> leaders
>>> > > are the ones that are computing URPs. That is basically what KIP-352
>>> had
>>> > > proposed, but we also need the changes to the reassignment path.
>>> Perhaps
>>> > it
>>> > > makes more sense to address this problem in KIP-236 since that is
>>> where
>>> > you
>>> > > have already introduced "original_replicas"? I'm also happy to do
>>> KIP-352
>>> > > as a follow-up to KIP-236.
>>> > >
>>> > > Best,
>>> > > Jason
>>> > >
>>> > >
>>> > > On Sun, Apr 7, 2019 at 5:09 PM Ismael Juma <isma...@gmail.com>
>>> wrote:
>>> > >
>>> > > > Good discussion about where we should do batching. I think if
>>> there is
>>> > a
>>> > > > clear great way to batch, then it makes a lot of sense to just do
>>> it
>>> > once.
>>> > > > However, if we think there is scope for experimenting with
>>> different
>>> > > > approaches, then an API that tools can use makes a lot of sense.
>>> They
>>> > can
>>> > > > experiment and innovate. Eventually, we can integrate something
>>> into
>>> > Kafka
>>> > > > if it makes sense.
>>> > > >
>>> > > > Ismael
>>> > > >
>>> > > > On Sun, Apr 7, 2019, 11:03 PM Colin McCabe <cmcc...@apache.org>
>>> wrote:
>>> > > >
>>> > > > > Hi George,
>>> > > > >
>>> > > > > As Jason was saying, it seems like there are two directions we
>>> could
>>> > go
>>> > > > > here: an external system handling batching, and the controller
>>> > handling
>>> > > > > batching.  I think the controller handling batching would be
>>> better,
>>> > > > since
>>> > > > > the controller has more information about the state of the
>>> system.
>>> > If
>>> > > > the
>>> > > > > controller handles batching, then the controller could also
>>> handle
>>> > things
>>> > > > > like setting up replication quotas for individual partitions.
>>> The
>>> > > > > controller could do things like throttle replication down if the
>>> > cluster
>>> > > > > was having problems.
>>> > > > >
>>> > > > > We kind of need to figure out which way we're going to go on
>>> this one
>>> > > > > before we set up big new APIs, I think.  If we want an external
>>> > system to
>>> > > > > handle batching, then we can keep the idea that there is only one
>>> > > > > reassignment in progress at once.  If we want the controller to
>>> > handle
>>> > > > > batching, we will need to get away from that idea.  Instead, we
>>> > should
>>> > > > just
>>> > > > > have a bunch of "ideal assignments" that we tell the controller
>>> > about,
>>> > > > and
>>> > > > > let it decide how to do the batching.  These ideal assignments
>>> could
>>> > > > change
>>> > > > > continuously over time, so from the admin's point of view, there
>>> > would be
>>> > > > > no start/stop/cancel, but just individual partition reassignments
>>> > that we
>>> > > > > submit, perhaps over a long period of time.  And then
>>> cancellation
>>> > might
>>> > > > > just mean cancelling just that individual partition reassignment,
>>> > not all
>>> > > > > partition reassignments.
>>> > > > >
>>> > > > > best,
>>> > > > > Colin
>>> > > > >
>>> > > > > On Fri, Apr 5, 2019, at 19:34, George Li wrote:
>>> > > > > >  Hi Jason / Viktor,
>>> > > > > >
>>> > > > > > For the URP during a reassignment,  if the "original_replicas"
>>> is
>>> > kept
>>> > > > > > for the current pending reassignment. I think it will be very
>>> easy
>>> > to
>>> > > > > > compare that with the topic/partition's ISR.  If all
>>> > > > > > "original_replicas" are in ISR, then URP should be 0 for that
>>> > > > > > topic/partition.
>>> > > > > >
>>> > > > > > It would be also nice to separate the metrics MaxLag/TotalLag
>>> for
>>> > > > > > Reassignments. I think that will also require
>>> "original_replicas"
>>> > (the
>>> > > > > > topic/partition's replicas just before reassignment when the AR
>>> > > > > > (Assigned Replicas) is set to Set(original_replicas) +
>>> > > > > > Set(new_replicas_in_reassign_partitions) ).
>>> > > > > >
>>> > > > > > Thanks,
>>> > > > > > George
>>> > > > > >
>>> > > > > >     On Friday, April 5, 2019, 6:29:55 PM PDT, Jason Gustafson
>>> > > > > > <ja...@confluent.io> wrote:
>>> > > > > >
>>> > > > > >  Hi Viktor,
>>> > > > > >
>>> > > > > > Thanks for writing this up. As far as questions about overlap
>>> with
>>> > > > > KIP-236,
>>> > > > > > I agree it seems mostly orthogonal. I think KIP-236 may have
>>> had a
>>> > > > larger
>>> > > > > > initial scope, but now it focuses on cancellation and batching
>>> is
>>> > left
>>> > > > > for
>>> > > > > > future work.
>>> > > > > >
>>> > > > > > With that said, I think we may not actually need a KIP for the
>>> > current
>>> > > > > > proposal since it doesn't change any APIs. To make it more
>>> > generally
>>> > > > > > useful, however, it would be nice to handle batching at the
>>> > partition
>>> > > > > level
>>> > > > > > as well as Jun suggests. The basic question is at what level
>>> > should the
>>> > > > > > batching be determined. You could rely on external processes
>>> (e.g.
>>> > > > cruise
>>> > > > > > control) or it could be built into the controller. There are
>>> > tradeoffs
>>> > > > > > either way, but I think it simplifies such tools if it is
>>> handled
>>> > > > > > internally. Then it would be much safer to submit a larger
>>> > reassignment
>>> > > > > > even just using the simple tools that come with Kafka.
>>> > > > > >
>>> > > > > > By the way, since you are looking into some of the reassignment
>>> > logic,
>>> > > > > > another problem that we might want to address is the misleading
>>> > way we
>>> > > > > > report URPs during a reassignment. I had a naive proposal for
>>> this
>>> > > > > > previously, but it didn't really work
>>> > > > > >
>>> > > > >
>>> > > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-352%3A+Distinguish+URPs+caused+by+reassignment
>>> > > > > .
>>> > > > > > Potentially fixing that could fall under this work as well if
>>> you
>>> > think
>>> > > > > > it
>>> > > > > > makes sense.
>>> > > > > >
>>> > > > > > Best,
>>> > > > > > Jason
>>> > > > > >
>>> > > > > > On Thu, Apr 4, 2019 at 4:49 PM Jun Rao <j...@confluent.io>
>>> wrote:
>>> > > > > >
>>> > > > > > > Hi, Viktor,
>>> > > > > > >
>>> > > > > > > Thanks for the KIP. A couple of comments below.
>>> > > > > > >
>>> > > > > > > 1. Another potential thing to do reassignment incrementally
>>> is to
>>> > > > move
>>> > > > > a
>>> > > > > > > batch of partitions at a time, instead of all partitions.
>>> This
>>> > may
>>> > > > > lead to
>>> > > > > > > less data replication since by the time the first batch of
>>> > partitions
>>> > > > > have
>>> > > > > > > been completely moved, some data of the next batch may have
>>> been
>>> > > > > deleted
>>> > > > > > > due to retention and doesn't need to be replicated.
>>> > > > > > >
>>> > > > > > > 2. "Update CR in Zookeeper with TR for the given partition".
>>> > Which
>>> > ZK
>>> > > > > path
>>> > > > > > > is this for?
>>> > > > > > >
>>> > > > > > > Jun
>>> > > > > > >
>>> > > > > > > On Sat, Feb 23, 2019 at 2:12 AM Viktor Somogyi-Vass <
>>> > > > > > > viktorsomo...@gmail.com>
>>> > > > > > > wrote:
>>> > > > > > >
>>> > > > > > > > Hi Harsha,
>>> > > > > > > >
>>> > > > > > > > As far as I understand KIP-236 it's about enabling
>>> reassignment
>>> > > > > > > > cancellation and as a future plan providing a queue of
>>> replica
>>> > > > > > > reassignment
>>> > > > > > > > steps to allow manual reassignment chains. While I agree
>>> that
>>> > the
>>> > > > > > > > reassignment chain has a specific use case that allows fine
>>> > grain
>>> > > > > control
>>> > > > > > > > over reassignment process, My proposal on the other hand
>>> > doesn't
>>> > > > talk
>>> > > > > > > about
>>> > > > > > > > cancellation but it only provides an automatic way to
>>> > > > incrementalize
>>> > > > > an
>>> > > > > > > > arbitrary reassignment which I think fits the general use
>>> case
>>> > > > where
>>> > > > > > > users
>>> > > > > > > > don't want that level of control but still would like a
>>> > balanced
>>> > > > way
>>> > > > > of
>>> > > > > > > > reassignments. Therefore I think it's still relevant as an
>>> > > > > improvement of
>>> > > > > > > > the current algorithm.
>>> > > > > > > > Nevertheless I'm happy to add my ideas to KIP-236 as I
>>> think
>>> > it
>>> > > > > would be
>>> > > > > > > a
>>> > > > > > > > great improvement to Kafka.
>>> > > > > > > >
>>> > > > > > > > Cheers,
>>> > > > > > > > Viktor
>>> > > > > > > >
>>> > > > > > > > On Fri, Feb 22, 2019 at 5:05 PM Harsha <ka...@harsha.io>
>>> > wrote:
>>> > > > > > > >
>>> > > > > > > > > Hi Viktor,
>>> > > > > > > > >            There is already KIP-236 for the same feature
>>> > and
>>> > > > George
>>> > > > > > > made
>>> > > > > > > > > a PR for this as well.
>>> > > > > > > > > Lets consolidate these two discussions. If you have any
>>> > cases
>>> > > > that
>>> > > > > are
>>> > > > > > > > not
>>> > > > > > > > > being solved by KIP-236 can you please mention them in
>>> > that
>>> > > > > thread. We
>>> > > > > > > > can
>>> > > > > > > > > address as part of KIP-236.
>>> > > > > > > > >
>>> > > > > > > > > Thanks,
>>> > > > > > > > > Harsha
>>> > > > > > > > >
>>> > > > > > > > > On Fri, Feb 22, 2019, at 5:44 AM, Viktor Somogyi-Vass
>>> wrote:
>>> > > > > > > > > > Hi Folks,
>>> > > > > > > > > >
>>> > > > > > > > > > I've created a KIP about an improvement of the
>>> reassignment
>>> > > > > algorithm
>>> > > > > > > > we
>>> > > > > > > > > > have. It aims to enable partition-wise incremental
>>> > > > reassignment.
>>> > > > > The
>>> > > > > > > > > > motivation for this is to avoid excess load that the
>>> > current
>>> > > > > > > > replication
>>> > > > > > > > > > algorithm implicitly carries as in that case there
>>> > are points
>>> > > > in
>>> > > > > the
>>> > > > > > > > > > algorithm where both the new and old replica set could
>>> > be
>>> > > > online
>>> > > > > and
>>> > > > > > > > > > replicating which puts double (or almost double)
>>> pressure
>>> > on
>>> > > > the
>>> > > > > > > > brokers
>>> > > > > > > > > > which could cause problems.
>>> > > > > > > > > > Instead my proposal would slice this up into several
>>> > steps
>>> > > > where
>>> > > > > each
>>> > > > > > > > > step
>>> > > > > > > > > > is calculated based on the final target replicas and
>>> > the
>>> > > > current
>>> > > > > > > > replica
>>> > > > > > > > > > assignment taking into account scenarios where brokers
>>> > could be
>>> > > > > > > offline
>>> > > > > > > > > and
>>> > > > > > > > > > when there are not enough replicas to fulfil the
>>> > > > > min.insync.replica
>>> > > > > > > > > > requirement.
>>> > > > > > > > > >
>>> > > > > > > > > > The link to the KIP:
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-435%3A+Incremental+Partition+Reassignment
>>> > > > > > > > > >
>>> > > > > > > > > > I'd be happy to receive any feedback.
>>> > > > > > > > > >
>>> > > > > > > > > > An important note is that this KIP and another one,
>>> > KIP-236
>>> > > > that
>>> > > > > is
>>> > > > > > > > > > about
>>> > > > > > > > > > interruptible reassignment (
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > >
>>> > > >
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%3A+Interruptible+Partition+Reassignment
>>> > > > > > > > > )
>>> > > > > > > > > > should be compatible.
>>> > > > > > > > > >
>>> > > > > > > > > > Thanks,
>>> > > > > > > > > > Viktor
>>> > > > > > > > > >
>>> > > > > > > > >
>>> > > > > > > >
>>> > > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to