Hey Jason,

I agree. Even apart from this proposal the partitioning strategy is really
an essential part of the metadata for a topic and had we been less lazy we
probably would have included it with the topic metadata.

I think in terms of grandfathering this in you could have existing topics
just be auto-assigned a "client" partitioning and add a "linear" strategy
(or whatever) that is that is checked server-side and supported in terms of
re-partitioning.

-Jay

On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Great discussion. I think I'm wondering whether we can continue to leave
> Kafka agnostic to the partitioning strategy. The challenge is communicating
> the partitioning logic from producers to consumers so that the dependencies
> between each epoch can be determined. For the sake of discussion, imagine
> you did something like the following:
>
> 1. The name (and perhaps version) of a partitioning strategy is stored in
> topic configuration when a topic is created.
> 2. The producer looks up the partitioning strategy before writing to a
> topic and includes it in the produce request (for fencing). If it doesn't
> have an implementation for the configured strategy, it fails.
> 3. The consumer also looks up the partitioning strategy and uses it to
> determine dependencies when reading a new epoch. It could either fail or
> make the most conservative dependency assumptions if it doesn't know how to
> implement the partitioning strategy. For the consumer, the new interface
> might look something like this:
>
> // Return the partition dependencies following an epoch bump
> Map<Integer, List<Integer>> dependencies(int numPartitionsBeforeEpochBump,
> int numPartitionsAfterEpochBump)
>
> The unordered case then is just a particular implementation which never has
> any epoch dependencies. To implement this, we would need some way for the
> consumer to find out how many partitions there were in each epoch, but
> maybe that's not too unreasonable.
>
> Thanks,
> Jason
>
>
> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
>
> > Hi Dong
> >
> > thank you very much for your questions.
> >
> > regarding the time spend copying data across:
> > It is correct that copying data from a topic with one partition mapping
> to
> > a topic with a different partition mapping takes way longer than we can
> > stop producers. Tens of minutes is a very optimistic estimate here. Many
> > people can not afford copy full steam and therefore will have some rate
> > limiting in place, this can bump the timespan into the day's. The good
> part
> > is that the vast majority of the data can be copied while the producers
> are
> > still going. One can then, piggyback the consumers ontop of this
> timeframe,
> > by the method mentioned (provide them an mapping from their old offsets
> to
> > new offsets in their repartitioned topics. In that way we separate
> > migration of consumers from migration of producers (decoupling these is
> > what kafka is strongest at). The time to actually swap over the producers
> > should be kept minimal by ensuring that when a swap attempt is started
> the
> > consumer copying over should be very close to the log end and is expected
> > to finish within the next fetch. The operation should have a time-out and
> > should be "reattemtable".
> >
> > Importance of logcompaction:
> > If a producer produces key A, to partiton 0, its forever gonna be there,
> > unless it gets deleted. The record might sit in there for years. A new
> > producer started with the new partitions will fail to delete the record
> in
> > the correct partition. Th record will be there forever and one can not
> > reliable bootstrap new consumers. I cannot see how linear hashing can
> solve
> > this.
> >
> > Regarding your skipping of userland copying:
> > 100%, copying the data across in userland is, as far as i can see, only a
> > usecase for log compacted topics. Even for logcompaction + retentions it
> > should only be opt-in. Why did I bring it up? I think log compaction is a
> > very important feature to really embrace kafka as a "data plattform". The
> > point I also want to make is that copying data this way is completely
> > inline with the kafka architecture. it only consists of reading and
> writing
> > to topics.
> >
> > I hope it clarifies more why I think we should aim for more than the
> > current KIP. I fear that once the KIP is done not much more effort will
> be
> > taken.
> >
> >
> >
> >
> > On 04.03.2018 02:28, Dong Lin wrote:
> >
> >> Hey Jan,
> >>
> >> In the current proposal, the consumer will be blocked on waiting for
> other
> >> consumers of the group to consume up to a given offset. In most cases,
> all
> >> consumers should be close to the LEO of the partitions when the
> partition
> >> expansion happens. Thus the time waiting should not be long e.g. on the
> >> order of seconds. On the other hand, it may take a long time to wait for
> >> the entire partition to be copied -- the amount of time is proportional
> to
> >> the amount of existing data in the partition, which can take tens of
> >> minutes. So the amount of time that we stop consumers may not be on the
> >> same order of magnitude.
> >>
> >> If we can implement this suggestion without copying data over in purse
> >> userland, it will be much more valuable. Do you have ideas on how this
> can
> >> be done?
> >>
> >> Not sure why the current KIP not help people who depend on log
> compaction.
> >> Could you elaborate more on this point?
> >>
> >> Thanks,
> >> Dong
> >>
> >> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<jan.filip...@trivago.com
> >
> >> wrote:
> >>
> >> Hi Dong,
> >>>
> >>> I tried to focus on what the steps are one can currently perform to
> >>> expand
> >>> or shrink a keyed topic while maintaining a top notch semantics.
> >>> I can understand that there might be confusion about "stopping the
> >>> consumer". It is exactly the same as proposed in the KIP. there needs
> to
> >>> be
> >>> a time the producers agree on the new partitioning. The extra
> semantics I
> >>> want to put in there is that we have a possibility to wait until all
> the
> >>> existing data
> >>> is copied over into the new partitioning scheme. When I say stopping I
> >>> think more of having a memory barrier that ensures the ordering. I am
> >>> still
> >>> aming for latencies  on the scale of leader failovers.
> >>>
> >>> Consumers have to explicitly adapt the new partitioning scheme in the
> >>> above scenario. The reason is that in these cases where you are
> dependent
> >>> on a particular partitioning scheme, you also have other topics that
> have
> >>> co-partition enforcements or the kind -frequently. Therefore all your
> >>> other
> >>> input topics might need to grow accordingly.
> >>>
> >>>
> >>> What I was suggesting was to streamline all these operations as best as
> >>> possible to have "real" partition grow and shrinkage going on.
> Migrating
> >>> the producers to a new partitioning scheme can be much more streamlined
> >>> with proper broker support for this. Migrating consumer is a step that
> >>> might be made completly unnecessary if - for example streams - takes
> the
> >>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect
> consumers
> >>> and other consumers should be fine anyways.
> >>>
> >>> I hope this makes more clear where I was aiming at. The rest needs to
> be
> >>> figured out. The only danger i see is that when we are introducing this
> >>> feature as supposed in the KIP, it wont help any people depending on
> log
> >>> compaction.
> >>>
> >>> The other thing I wanted to mention is that I believe the current
> >>> suggestion (without copying data over) can be implemented in pure
> >>> userland
> >>> with a custom partitioner and a small feedbackloop from ProduceResponse
> >>> =>
> >>> Partitionier in coorporation with a change management system.
> >>>
> >>> Best Jan
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On 28.02.2018 07:13, Dong Lin wrote:
> >>>
> >>> Hey Jan,
> >>>>
> >>>> I am not sure if it is acceptable for producer to be stopped for a
> >>>> while,
> >>>> particularly for online application which requires low latency. I am
> >>>> also
> >>>> not sure how consumers can switch to a new topic. Does user
> application
> >>>> needs to explicitly specify a different topic for producer/consumer to
> >>>> subscribe to? It will be helpful for discussion if you can provide
> more
> >>>> detail on the interface change for this solution.
> >>>>
> >>>> Thanks,
> >>>> Dong
> >>>>
> >>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak<Jan.Filipiak@trivago.
> com
> >>>> >
> >>>> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>>> just want to throw my though in. In general the functionality is very
> >>>>> usefull, we should though not try to find the architecture to hard
> >>>>> while
> >>>>> implementing.
> >>>>>
> >>>>> The manual steps would be to
> >>>>>
> >>>>> create a new topic
> >>>>> the mirrormake from the new old topic to the new topic
> >>>>> wait for mirror making to catch up.
> >>>>> then put the consumers onto the new topic
> >>>>>       (having mirrormaker spit out a mapping from old offsets to new
> >>>>> offsets:
> >>>>>           if topic is increased by factor X there is gonna be a clean
> >>>>> mapping from 1 offset in the old topic to X offsets in the new topic,
> >>>>>           if there is no factor then there is no chance to generate a
> >>>>> mapping that can be reasonable used for continuing)
> >>>>>       make consumers stop at appropriate points and continue
> >>>>> consumption
> >>>>> with offsets from the mapping.
> >>>>> have the producers stop for a minimal time.
> >>>>> wait for mirrormaker to finish
> >>>>> let producer produce with the new metadata.
> >>>>>
> >>>>>
> >>>>> Instead of implementing the approach suggest in the KIP which will
> >>>>> leave
> >>>>> log compacted topic completely crumbled and unusable.
> >>>>> I would much rather try to build infrastructure to support the
> >>>>> mentioned
> >>>>> above operations more smoothly.
> >>>>> Especially having producers stop and use another topic is difficult
> and
> >>>>> it would be nice if one can trigger "invalid metadata" exceptions for
> >>>>> them
> >>>>> and
> >>>>> if one could give topics aliases so that their produces with the old
> >>>>> topic
> >>>>> will arrive in the new topic.
> >>>>>
> >>>>> The downsides are obvious I guess ( having the same data twice for
> the
> >>>>> transition period, but kafka tends to scale well with datasize). So
> >>>>> its a
> >>>>> nicer fit into the architecture.
> >>>>>
> >>>>> I further want to argument that the functionality by the KIP can
> >>>>> completely be implementing in "userland" with a custom partitioner
> that
> >>>>> handles the transition as needed. I would appreciate if someone could
> >>>>> point
> >>>>> out what a custom partitioner couldn't handle in this case?
> >>>>>
> >>>>> With the above approach, shrinking a topic becomes the same steps.
> >>>>> Without
> >>>>> loosing keys in the discontinued partitions.
> >>>>>
> >>>>> Would love to hear what everyone thinks.
> >>>>>
> >>>>> Best Jan
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 11.02.2018 00:35, Dong Lin wrote:
> >>>>>
> >>>>> Hi all,
> >>>>>
> >>>>>> I have created KIP-253: Support in-order message delivery with
> >>>>>> partition
> >>>>>> expansion. See
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
> >>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
> >>>>>> .
> >>>>>>
> >>>>>> This KIP provides a way to allow messages of the same key from the
> >>>>>> same
> >>>>>> producer to be consumed in the same order they are produced even if
> we
> >>>>>> expand partition of the topic.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Dong
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >
>

Reply via email to