Hello Dong, thanks a lot for proposing the KIP!

I'm in favor of partition splitting, or more generally consistent hashing
mechanism over partition rehashing for the reasons that Matthias as
summarized well. Another point is that for the downstream stateful
consumers that need to reshard their states due to num.partition changes,
general partition rehashing means that the consumers need to know the
partitioning scheme that producers use to produce to these partitions in
order to reshard correctly, which may not always be the case since the 1)
partitioner could be customized by producers today and is unknown to
consumers and 2) for those shared topics, we may not even have control on
all the producers' partitioning schemes producing to them. With consistent
hashing mechanism, consumer do not need to presume what partitioning scheme
that producers have used, but only needs to know the splitting scheme that
brokers are using, assumes that producer partitioning scheme follows that
restriction during the scaling procedure which is more likely in practice
(I think Allen has expressed the same opinion in his previous email but
please correct me if I'm wrong). Admittedly though, upgrading the producer
clients with default murmur hash purely on the num.partitions today to a
more advanced linear hashing mechanism would be a big operational endeavor.

As for Kafka Streams in particular, I think consistent hashing can still
benefit reducing the end to end rebalancing latency than partition
rehashing as long as Streams can pre-build the standby replicas for the new
tasks (or tasks that are going to be merged, as for partition removal or
partition merging) to if it knows a rebalancing is coming (some related
proposals are discussed in KAFKA-6145), which is indeed the case here as
Jun and Matthias mentioned, and consistent hashing could help reducing the
workload of such pre-build process between step 1 and step 2.


Guozhang





On Sun, Mar 4, 2018 at 6:13 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> About partition splitting and load balancing:
>
> I think there are two cases:
>
> 1) one partition is a hot-spot: for this case, splitting a partitions is
> actually more desirable than adding a new partitions as it ensure the
> the load of the hot partitions is reduced. Adding partitions does not
> help here.
>
> 2) there is "perfect" load balancing: for this case, if load needs to be
> reduced, adding one partitions would not really help a lot, as it would
> reduce the load not much per partition. Assume there are 20 partitions
> in the topic: to notable reduce the load, one would need to add for
> example 10 partitions to reduce the load of the old partitions by 30% --
> thus, maybe doubling the number of partitions (via splitting) might not
> be too bad compared to adding 10 partitions? Considering that there is
> never a "perfectly" balanced load, in practice it might not be required
> to split all partitions, but only those partitions with highest load --
> this would mitigate the issue of unbalance load if not all partitions
> are split.
>
>
>
> About consumer state: not sure how a tool could accomplish this. What
> "signal" would you send to a consumer? Also, how can an admin make sure
> that all consumers are informed?
>
> In Kafka Streams, we get all information during a rebalance. Thus, it
> would help, if we get the information about new partitions to-come in
> this phase. This would allow us, to split the state of a partition
> pro-actively while still consuming from the non-split partitions. this
> is also helpful, as we need to updated the partitions for changelog
> topics, too and split those accordingly.
>
> With regard to the delay: for Kafka Streams we need more control over
> this. Thus, two admin requests would work better: first to prepare the
> consumer (ie, add partitions "read-only", triggering a rebalance),
> second to enable the new partitions for the producer (maybe also
> triggering a rebalance). If it would be time-delay based, it might be
> problematic for Kafka Streams: if we get the information that the new
> input partitions are available for producing, we need to enable the new
> changelog partitions for producing, too. If those would not be available
> yet, because the time-delay did not trigger yet, it would be problematic
> to avoid crashing.
>
>
> I did not think about a design for Kafka Streams in detail yet, but the
> above sketch might lead to a viable solution here. Not sure, if a
> similar protocol would work for Connect, too.
>
>
> -Matthias
>
> On 3/3/18 5:17 PM, Dong Lin wrote:
> > Hey Jay, Allen, Jason, Jun,
> >
> > Thanks much for the valuable feedback. It is very good point that we can
> > reduce the state that needs to be stored/loaded by using linear hashing.
> I
> > have updated the KIP to use linear hashing as suggested. The benefits of
> > using linear hashing is summarized in "Changes in how producer constructs
> > ProduceRequest" in the KIP.
> >
> > And good point regarding the number of partitions that will be blocked on
> > metadata update due to partition number change (as compared to leadership
> > movement). I have updated the KIP such that producer will not be blocked
> on
> > updating metadata due to partition number change,
> > if enable.ordered.delivery is overridden to false.
> >
> > As Jun commented, the downside with linear hashing is that the load may
> not
> > be distributed evenly across partitions unless # partitions is doubled --
> > some partitions may be twice as loaded as other partitions. In reality
> this
> > may not be an issue. For large topics that require partition expansion,
> the
> > partition number is likely to be much larger than the number of consumers
> > and the number of brokers. We can still balance the load across consumers
> > (and brokers) with proper assigner (and reassignment across brokers).
> >
> > Regarding the suggestion to let producer produces to newly added
> partition
> > at a configured amount of time after consumer gets notified of new
> > partitions, it may be simpler to do it outside core Kafka. For example,
> we
> > can provide a tool to notify consumers in the Kafka stream of the future
> > partition expansion so that consumers in Kafka stream can start to
> > store/load state. The system administrator can run this tool first and
> > expand change the partitions of the topic later.
> >
> > Alternatively we can do this optimization in core Kafka, such that the
> > timestamp of the partition change is written in zookeeper, the
> > consumer_undeleted_partition_count is included in metadata in addition
> to
> > the existing undeleted_partition_count used by producer, and controller
> > will update undeleted_partition_count in metadata at timestamp_of_change
> +
> > configured_delay. It just seems to be a bit clumsy. It will be great to
> > hear about the opinion regarding this choice.
> >
> >
> > Thanks!
> > Dong
> >
> >
> > On Fri, Mar 2, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote:
> >
> >> linear hashing (or partition splitting) vs rehashing every key: The
> benefit
> >> of the former is that it reduces the # of partitions to which keys from
> an
> >> existing partition are re-distributed, which potentially reduces the
> >> overhead of rebuilding the state in a consumer. The downside is that the
> >> load may not be distributed evenly across partitions unless #
> partitions is
> >> doubled. When the number of existing partitions is already large, one
> may
> >> not want to always double the partitions.
> >>
> >> Even with linear hashing, certain consumer instances within a consumer
> >> group still need to rebuild the state for some partitions. This can
> still
> >> affect the overall latency of a streaming job if the processing depends
> on
> >> data coming from all partitions. Another way to improve this is to add
> >> partitions in two steps. In the first step, new partitions will be added
> >> and exposed to the consumer, but not the producer. After this step, the
> >> consumer can start preparing the state for the new partitions, but won't
> >> need to use them since there is no data in those new partitions yet. In
> the
> >> second step, the producer can start publishing to the new partitions. At
> >> this point, the consumer needs to process the data in new partitions.
> >> However, if the state for the new partitions are almost ready, the
> amount
> >> of waiting will be minimal. We can potentially add a new config that
> >> controls the delay between the 2 steps.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Fri, Mar 2, 2018 at 1:28 PM, Jay Kreps <j...@confluent.io> wrote:
> >>
> >>> Hey Dong,
> >>>
> >>> Cool, obviously we'd need to have a solution here work with connect and
> >>> streams to be viable.
> >>>
> >>> On the linear hashing thing, what I am talking about is something
> >>> different. I am talking about splitting existing partitions
> >> incrementally.
> >>> E.g. if you have 100 partitions and want to move to 110. Obviously a
> >> naive
> >>> approach which added partitions would require you to reshuffle all data
> >> as
> >>> the hashing of all data would change. A linear hashing-like scheme
> gives
> >> an
> >>> approach by which you can split individual partitions one at a time to
> >>> avoid needing to reshuffle much data. This approach has the benefit
> that
> >> at
> >>> any time you have a fixed number of partitions and all data is fully
> >>> partitioned with whatever the partition count you choose is but also
> has
> >>> the benefit that you can dynamically scale up or down the partition
> >> count.
> >>> This seems like it simplifies things like log compaction etc.
> >>>
> >>> -Jay
> >>>
> >>> On Sun, Feb 25, 2018 at 3:51 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>>
> >>>> Hey Jay,
> >>>>
> >>>> Thanks for the comment!
> >>>>
> >>>> I have not specifically thought about how this works with Streams and
> >>>> Connect. The current KIP w.r.t. the interface that our producer and
> >>>> consumer exposes to the user. It ensures that if there are two
> messages
> >>>> with the same key produced by the same producer, say messageA and
> >>> messageB,
> >>>> and suppose messageB is produced after messageA to a different
> >> partition
> >>>> than messageA, then we can guarantee that the following sequence can
> >>> happen
> >>>> in order:
> >>>>
> >>>> - Consumer of messageA can execute callback, in which user can flush
> >>> state
> >>>> related to the key of messageA.
> >>>> - messageA is delivered by its consumer to the application
> >>>> - Consumer of messageB can execute callback, in which user can load
> the
> >>>> state related to the key of messageB.
> >>>> - messageB is delivered by its consumer to the application.
> >>>>
> >>>> So it seems that it should support Streams and Connect properly. But I
> >> am
> >>>> not entirely sure because I have not looked into how Streams and
> >> Connect
> >>>> works. I can think about it more if you can provide an example where
> >> this
> >>>> does not work for Streams and Connect.
> >>>>
> >>>> Regarding the second question, I think linear hashing approach
> >> provides a
> >>>> way to reduce the number of partitions that can "conflict" with a give
> >>>> partition to *log_2(n)*, as compares to *n* in the current KIP, where
> n
> >>> is
> >>>> the total number of partitions of the topic. This will be useful when
> >>>> number of partition is large and asymptotic complexity matters.
> >>>>
> >>>> I personally don't think this optimization is worth the additional
> >>>> complexity in Kafka. This is because partition expansion or deletion
> >>> should
> >>>> happen infrequently and the largest number of partitions of a single
> >>> topic
> >>>> today is not that large -- probably 1000 or less. And when partitions
> >> of
> >>> a
> >>>> topic changes, each consumer will likely need to query and wait for
> >>>> positions of a large percentage of partitions of the topic anyway even
> >>> with
> >>>> this optimization. I think this algorithm is kind of orthogonal to
> this
> >>>> KIP. We can extend the KIP to support this algorithm in the future as
> >>> well.
> >>>>
> >>>> Thanks,
> >>>> Dong
> >>>>
> >>>> On Thu, Feb 22, 2018 at 5:19 PM, Jay Kreps <j...@confluent.io> wrote:
> >>>>
> >>>>> Hey Dong,
> >>>>>
> >>>>> Two questions:
> >>>>> 1. How will this work with Streams and Connect?
> >>>>> 2. How does this compare to a solution where we physically split
> >>>> partitions
> >>>>> using a linear hashing approach (the partition number is equivalent
> >> to
> >>>> the
> >>>>> hash bucket in a hash table)? https://en.wikipedia.org/wiki/
> >>>> Linear_hashing
> >>>>>
> >>>>> -Jay
> >>>>>
> >>>>> On Sat, Feb 10, 2018 at 3:35 PM, Dong Lin <lindon...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> I have created KIP-253: Support in-order message delivery with
> >>>> partition
> >>>>>> expansion. See
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>>>> 253%3A+Support+in-order+message+delivery+with+partition+expansion
> >>>>>> .
> >>>>>>
> >>>>>> This KIP provides a way to allow messages of the same key from the
> >>> same
> >>>>>> producer to be consumed in the same order they are produced even if
> >>> we
> >>>>>> expand partition of the topic.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Dong
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>


-- 
-- Guozhang

Reply via email to