Hey Jason, Thanks much for all the valuable feedback!
On Wed, Feb 28, 2018 at 11:09 AM, Jason Gustafson <ja...@confluent.io> wrote: > Hi Dong, > > Great work on this proposal! Just a couple initial comments: > > My understanding is that the consumer will block on a topic until the all > partitions have reached a certain partition epoch. What are the > implications if a partition is offline? If we observe an epoch change while > a partition is offline, it seems like we'd have to wait until the partition > is back online before we can begin consuming the new epoch. Otherwise we > will violate the ordering guarantees. Many use cases involve unordered > data, so this would be a kind of regression in behavior, wouldn't it? A > couple ideas: > > 1. Maybe we could have a topic configuration that controls whether or not > ordering on the topic needs to be strictly followed? If we don't care about > ordering, the consumer need not synchronize on epoch boundaries and we need > not care about offline partitions. > Very good point. Though Kafka admin should make sure that offline partition happens really rarely (with proper RF etc.), I agree that it is bad to let partitions of non-keyed topic blocking waiting for an offline partition. I have updated the KIP to include enable.ordered.delivery as a new topic-level config. Since offline partitions should happen very rarely and most keyed users would want to get the ordered guarantee by default, I have set this config to true by default. I don't have a strong opinion on the default value though. > 2. Waiting on all partitions allows for any key partitioning function. It's > good because it's general, but it is overly conservative when the > partitioning function has finer control over key movement. For example, if > the partitioner only allows for splits, then there is just one partition to > await before consuming a new epoch for any given partition. I am not sure > what it would look like, but I'm wondering if it would be possible to > leverage the custom partitioning logic on the consumer side as well to > avoid unneeded waiting. Yeah I have thought about this. That is why I originally wanted to always double the partition number of a topic. For example, if we double partition number from 6 to 12, we know that consumer only needs to wait for partition 1 before consuming the first message in partition 7. I think we may be able to optimize the consumer-side performance while still allowing partition expansion to arbitrary number (e.g. not limited by double), by using the Linear Hashing algorithm that Jay suggested. I will think more about it. > > I think piggybacking the epoch exchanges onto the consumer heartbeats is a > good idea. Just wanted to mention that consumers are not the only ones > using the heartbeat API. For example, Kafka Connect also uses the group > protocol to balance its load. Of course other use cases could leave these > fields empty, but it's a little odd to have the protocol tailored > specifically for one use case. To be honest, the group management protocol > is one of the messier Kafka APIs and I don't think anyone is satisfied with > the current approach. We need not redesign the whole thing in this KIP, but > it might be nice to consider some options so that we're sure we're either > heading in a better direction or at least not making things more confusing > than they already are. The challenge is that it's useful to have some > coordinator logic specific to the group type. I can imagine down the road > that other use cases may also have some custom metadata which they need to > piggyback on the heartbeat and they may also need the coordinator to do > some facilitation. Maybe the heartbeat protocol could be left generic and > we could have a separate module in the GroupCoordinator for custom consumer > logic? Not too sure the best way to go. > Good point. I wasn't aware that HeartbeatRequest and HeartbeatResponse are also used outside consumer. Since these are used for other purposes, I have instead created ConsumerGroupPositionRequest and ConsumerGroupPositionResponse as suggested. > > Thanks, > Jason > > > On Tue, Feb 27, 2018 at 11:49 PM, Stephane Maarek < > steph...@simplemachines.com.au> wrote: > > > Sounds awesome ! > > Are you planning to have auto scaling of partitions in a following KIP ? > > That would be the holy grail > > > > On 28 Feb. 2018 5:13 pm, "Dong Lin" <lindon...@gmail.com> wrote: > > > > > Hey Jan, > > > > > > I am not sure if it is acceptable for producer to be stopped for a > while, > > > particularly for online application which requires low latency. I am > also > > > not sure how consumers can switch to a new topic. Does user application > > > needs to explicitly specify a different topic for producer/consumer to > > > subscribe to? It will be helpful for discussion if you can provide more > > > detail on the interface change for this solution. > > > > > > Thanks, > > > Dong > > > > > > On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak < > jan.filip...@trivago.com > > > > > > wrote: > > > > > > > Hi, > > > > > > > > just want to throw my though in. In general the functionality is very > > > > usefull, we should though not try to find the architecture to hard > > while > > > > implementing. > > > > > > > > The manual steps would be to > > > > > > > > create a new topic > > > > the mirrormake from the new old topic to the new topic > > > > wait for mirror making to catch up. > > > > then put the consumers onto the new topic > > > > (having mirrormaker spit out a mapping from old offsets to new > > > offsets: > > > > if topic is increased by factor X there is gonna be a clean > > > > mapping from 1 offset in the old topic to X offsets in the new topic, > > > > if there is no factor then there is no chance to generate a > > > > mapping that can be reasonable used for continuing) > > > > make consumers stop at appropriate points and continue > consumption > > > > with offsets from the mapping. > > > > have the producers stop for a minimal time. > > > > wait for mirrormaker to finish > > > > let producer produce with the new metadata. > > > > > > > > > > > > Instead of implementing the approach suggest in the KIP which will > > leave > > > > log compacted topic completely crumbled and unusable. > > > > I would much rather try to build infrastructure to support the > > mentioned > > > > above operations more smoothly. > > > > Especially having producers stop and use another topic is difficult > and > > > > it would be nice if one can trigger "invalid metadata" exceptions for > > > them > > > > and > > > > if one could give topics aliases so that their produces with the old > > > topic > > > > will arrive in the new topic. > > > > > > > > The downsides are obvious I guess ( having the same data twice for > the > > > > transition period, but kafka tends to scale well with datasize). So > > its a > > > > nicer fit into the architecture. > > > > > > > > I further want to argument that the functionality by the KIP can > > > > completely be implementing in "userland" with a custom partitioner > that > > > > handles the transition as needed. I would appreciate if someone could > > > point > > > > out what a custom partitioner couldn't handle in this case? > > > > > > > > With the above approach, shrinking a topic becomes the same steps. > > > Without > > > > loosing keys in the discontinued partitions. > > > > > > > > Would love to hear what everyone thinks. > > > > > > > > Best Jan > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 11.02.2018 00:35, Dong Lin wrote: > > > > > > > >> Hi all, > > > >> > > > >> I have created KIP-253: Support in-order message delivery with > > partition > > > >> expansion. See > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253% > > > >> 3A+Support+in-order+message+delivery+with+partition+expansion > > > >> . > > > >> > > > >> This KIP provides a way to allow messages of the same key from the > > same > > > >> producer to be consumed in the same order they are produced even if > we > > > >> expand partition of the topic. > > > >> > > > >> Thanks, > > > >> Dong > > > >> > > > >> > > > > > > > > > >