Hi Dong,

Great work on this proposal! Just a couple initial comments:

My understanding is that the consumer will block on a topic until the all
partitions have reached a certain partition epoch. What are the
implications if a partition is offline? If we observe an epoch change while
a partition is offline, it seems like we'd have to wait until the partition
is back online before we can begin consuming the new epoch. Otherwise we
will violate the ordering guarantees. Many use cases involve unordered
data, so this would be a kind of regression in behavior, wouldn't it? A
couple ideas:

1. Maybe we could have a topic configuration that controls whether or not
ordering on the topic needs to be strictly followed? If we don't care about
ordering, the consumer need not synchronize on epoch boundaries and we need
not care about offline partitions.
2. Waiting on all partitions allows for any key partitioning function. It's
good because it's general, but it is overly conservative when the
partitioning function has finer control over key movement. For example, if
the partitioner only allows for splits, then there is just one partition to
await before consuming a new epoch for any given partition. I am not sure
what it would look like, but I'm wondering if it would be possible to
leverage the custom partitioning logic on the consumer side as well to
avoid unneeded waiting.

I think piggybacking the epoch exchanges onto the consumer heartbeats is a
good idea. Just wanted to mention that consumers are not the only ones
using the heartbeat API. For example, Kafka Connect also uses the group
protocol to balance its load. Of course other use cases could leave these
fields empty, but it's a little odd to have the protocol tailored
specifically for one use case. To be honest, the group management protocol
is one of the messier Kafka APIs and I don't think anyone is satisfied with
the current approach. We need not redesign the whole thing in this KIP, but
it might be nice to consider some options so that we're sure we're either
heading in a better direction or at least not making things more confusing
than they already are. The challenge is that it's useful to have some
coordinator logic specific to the group type. I can imagine down the road
that other use cases may also have some custom metadata which they need to
piggyback on the heartbeat and they may also need the coordinator to do
some facilitation. Maybe the heartbeat protocol could be left generic and
we could have a separate module in the GroupCoordinator for custom consumer
logic? Not too sure the best way to go.

Thanks,
Jason


On Tue, Feb 27, 2018 at 11:49 PM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Sounds awesome !
> Are you planning to have auto scaling of partitions in a following KIP ?
> That would be the holy grail
>
> On 28 Feb. 2018 5:13 pm, "Dong Lin" <lindon...@gmail.com> wrote:
>
> > Hey Jan,
> >
> > I am not sure if it is acceptable for producer to be stopped for a while,
> > particularly for online application which requires low latency. I am also
> > not sure how consumers can switch to a new topic. Does user application
> > needs to explicitly specify a different topic for producer/consumer to
> > subscribe to? It will be helpful for discussion if you can provide more
> > detail on the interface change for this solution.
> >
> > Thanks,
> > Dong
> >
> > On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak <jan.filip...@trivago.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > just want to throw my though in. In general the functionality is very
> > > usefull, we should though not try to find the architecture to hard
> while
> > > implementing.
> > >
> > > The manual steps would be to
> > >
> > > create a new topic
> > > the mirrormake from the new old topic to the new topic
> > > wait for mirror making to catch up.
> > > then put the consumers onto the new topic
> > >     (having mirrormaker spit out a mapping from old offsets to new
> > offsets:
> > >         if topic is increased by factor X there is gonna be a clean
> > > mapping from 1 offset in the old topic to X offsets in the new topic,
> > >         if there is no factor then there is no chance to generate a
> > > mapping that can be reasonable used for continuing)
> > >     make consumers stop at appropriate points and continue consumption
> > > with offsets from the mapping.
> > > have the producers stop for a minimal time.
> > > wait for mirrormaker to finish
> > > let producer produce with the new metadata.
> > >
> > >
> > > Instead of implementing the approach suggest in the KIP which will
> leave
> > > log compacted topic completely crumbled and unusable.
> > > I would much rather try to build infrastructure to support the
> mentioned
> > > above operations more smoothly.
> > > Especially having producers stop and use another topic is difficult and
> > > it would be nice if one can trigger "invalid metadata" exceptions for
> > them
> > > and
> > > if one could give topics aliases so that their produces with the old
> > topic
> > > will arrive in the new topic.
> > >
> > > The downsides are obvious I guess ( having the same data twice for the
> > > transition period, but kafka tends to scale well with datasize). So
> its a
> > > nicer fit into the architecture.
> > >
> > > I further want to argument that the functionality by the KIP can
> > > completely be implementing in "userland" with a custom partitioner that
> > > handles the transition as needed. I would appreciate if someone could
> > point
> > > out what a custom partitioner couldn't handle in this case?
> > >
> > > With the above approach, shrinking a topic becomes the same steps.
> > Without
> > > loosing keys in the discontinued partitions.
> > >
> > > Would love to hear what everyone thinks.
> > >
> > > Best Jan
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 11.02.2018 00:35, Dong Lin wrote:
> > >
> > >> Hi all,
> > >>
> > >> I have created KIP-253: Support in-order message delivery with
> partition
> > >> expansion. See
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
> > >> 3A+Support+in-order+message+delivery+with+partition+expansion
> > >> .
> > >>
> > >> This KIP provides a way to allow messages of the same key from the
> same
> > >> producer to be consumed in the same order they are produced even if we
> > >> expand partition of the topic.
> > >>
> > >> Thanks,
> > >> Dong
> > >>
> > >>
> > >
> >
>

Reply via email to