Hey Jason,

Thanks much for all the valuable feedback!

On Wed, Feb 28, 2018 at 11:09 AM, Jason Gustafson <ja...@confluent.io>
wrote:

> Hi Dong,
>
> Great work on this proposal! Just a couple initial comments:
>
> My understanding is that the consumer will block on a topic until the all
> partitions have reached a certain partition epoch. What are the
> implications if a partition is offline? If we observe an epoch change while
> a partition is offline, it seems like we'd have to wait until the partition
> is back online before we can begin consuming the new epoch. Otherwise we
> will violate the ordering guarantees. Many use cases involve unordered
> data, so this would be a kind of regression in behavior, wouldn't it? A
> couple ideas:
>
> 1. Maybe we could have a topic configuration that controls whether or not
> ordering on the topic needs to be strictly followed? If we don't care about
> ordering, the consumer need not synchronize on epoch boundaries and we need
> not care about offline partitions.
>

Very good point. Though Kafka admin should make sure that offline partition
happens really rarely (with proper RF etc.), I agree that it is bad to let
partitions of non-keyed topic blocking waiting for an offline partition.

I have updated the KIP to include enable.ordered.delivery as a new
topic-level config. Since offline partitions should happen very rarely and
most keyed users would want to get the ordered guarantee by default, I have
set this config to true by default. I don't have a strong opinion on the
default value though.



> 2. Waiting on all partitions allows for any key partitioning function. It's
> good because it's general, but it is overly conservative when the
> partitioning function has finer control over key movement. For example, if
> the partitioner only allows for splits, then there is just one partition to
> await before consuming a new epoch for any given partition. I am not sure
> what it would look like, but I'm wondering if it would be possible to
> leverage the custom partitioning logic on the consumer side as well to
> avoid unneeded waiting.


Yeah I have thought about this. That is why I originally wanted to always
double the partition number of a topic. For example, if we double partition
number from 6 to 12, we know that consumer only needs to wait for partition
1 before consuming the first message in partition 7.

I think we may be able to optimize the consumer-side performance while
still allowing partition expansion to arbitrary number (e.g. not limited by
double), by using the Linear Hashing algorithm that Jay suggested. I will
think more about it.


>
> I think piggybacking the epoch exchanges onto the consumer heartbeats is a
> good idea. Just wanted to mention that consumers are not the only ones
> using the heartbeat API. For example, Kafka Connect also uses the group
> protocol to balance its load. Of course other use cases could leave these
> fields empty, but it's a little odd to have the protocol tailored
> specifically for one use case. To be honest, the group management protocol
> is one of the messier Kafka APIs and I don't think anyone is satisfied with
> the current approach. We need not redesign the whole thing in this KIP, but
> it might be nice to consider some options so that we're sure we're either
> heading in a better direction or at least not making things more confusing
> than they already are. The challenge is that it's useful to have some
> coordinator logic specific to the group type. I can imagine down the road
> that other use cases may also have some custom metadata which they need to
> piggyback on the heartbeat and they may also need the coordinator to do
> some facilitation. Maybe the heartbeat protocol could be left generic and
> we could have a separate module in the GroupCoordinator for custom consumer
> logic? Not too sure the best way to go.
>

Good point. I wasn't aware that HeartbeatRequest and HeartbeatResponse are
also used outside consumer. Since these are used for other purposes, I have
instead created ConsumerGroupPositionRequest and
ConsumerGroupPositionResponse as suggested.



>
> Thanks,
> Jason
>
>
> On Tue, Feb 27, 2018 at 11:49 PM, Stephane Maarek <
> steph...@simplemachines.com.au> wrote:
>
> > Sounds awesome !
> > Are you planning to have auto scaling of partitions in a following KIP ?
> > That would be the holy grail
> >
> > On 28 Feb. 2018 5:13 pm, "Dong Lin" <lindon...@gmail.com> wrote:
> >
> > > Hey Jan,
> > >
> > > I am not sure if it is acceptable for producer to be stopped for a
> while,
> > > particularly for online application which requires low latency. I am
> also
> > > not sure how consumers can switch to a new topic. Does user application
> > > needs to explicitly specify a different topic for producer/consumer to
> > > subscribe to? It will be helpful for discussion if you can provide more
> > > detail on the interface change for this solution.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak <
> jan.filip...@trivago.com
> > >
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > just want to throw my though in. In general the functionality is very
> > > > usefull, we should though not try to find the architecture to hard
> > while
> > > > implementing.
> > > >
> > > > The manual steps would be to
> > > >
> > > > create a new topic
> > > > the mirrormake from the new old topic to the new topic
> > > > wait for mirror making to catch up.
> > > > then put the consumers onto the new topic
> > > >     (having mirrormaker spit out a mapping from old offsets to new
> > > offsets:
> > > >         if topic is increased by factor X there is gonna be a clean
> > > > mapping from 1 offset in the old topic to X offsets in the new topic,
> > > >         if there is no factor then there is no chance to generate a
> > > > mapping that can be reasonable used for continuing)
> > > >     make consumers stop at appropriate points and continue
> consumption
> > > > with offsets from the mapping.
> > > > have the producers stop for a minimal time.
> > > > wait for mirrormaker to finish
> > > > let producer produce with the new metadata.
> > > >
> > > >
> > > > Instead of implementing the approach suggest in the KIP which will
> > leave
> > > > log compacted topic completely crumbled and unusable.
> > > > I would much rather try to build infrastructure to support the
> > mentioned
> > > > above operations more smoothly.
> > > > Especially having producers stop and use another topic is difficult
> and
> > > > it would be nice if one can trigger "invalid metadata" exceptions for
> > > them
> > > > and
> > > > if one could give topics aliases so that their produces with the old
> > > topic
> > > > will arrive in the new topic.
> > > >
> > > > The downsides are obvious I guess ( having the same data twice for
> the
> > > > transition period, but kafka tends to scale well with datasize). So
> > its a
> > > > nicer fit into the architecture.
> > > >
> > > > I further want to argument that the functionality by the KIP can
> > > > completely be implementing in "userland" with a custom partitioner
> that
> > > > handles the transition as needed. I would appreciate if someone could
> > > point
> > > > out what a custom partitioner couldn't handle in this case?
> > > >
> > > > With the above approach, shrinking a topic becomes the same steps.
> > > Without
> > > > loosing keys in the discontinued partitions.
> > > >
> > > > Would love to hear what everyone thinks.
> > > >
> > > > Best Jan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 11.02.2018 00:35, Dong Lin wrote:
> > > >
> > > >> Hi all,
> > > >>
> > > >> I have created KIP-253: Support in-order message delivery with
> > partition
> > > >> expansion. See
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
> > > >> 3A+Support+in-order+message+delivery+with+partition+expansion
> > > >> .
> > > >>
> > > >> This KIP provides a way to allow messages of the same key from the
> > same
> > > >> producer to be consumed in the same order they are produced even if
> we
> > > >> expand partition of the topic.
> > > >>
> > > >> Thanks,
> > > >> Dong
> > > >>
> > > >>
> > > >
> > >
> >
>

Reply via email to