Hey Jason, I agree. Even apart from this proposal the partitioning strategy is really an essential part of the metadata for a topic and had we been less lazy we probably would have included it with the topic metadata.
I think in terms of grandfathering this in you could have existing topics just be auto-assigned a "client" partitioning and add a "linear" strategy (or whatever) that is that is checked server-side and supported in terms of re-partitioning. -Jay On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io> wrote: > Great discussion. I think I'm wondering whether we can continue to leave > Kafka agnostic to the partitioning strategy. The challenge is communicating > the partitioning logic from producers to consumers so that the dependencies > between each epoch can be determined. For the sake of discussion, imagine > you did something like the following: > > 1. The name (and perhaps version) of a partitioning strategy is stored in > topic configuration when a topic is created. > 2. The producer looks up the partitioning strategy before writing to a > topic and includes it in the produce request (for fencing). If it doesn't > have an implementation for the configured strategy, it fails. > 3. The consumer also looks up the partitioning strategy and uses it to > determine dependencies when reading a new epoch. It could either fail or > make the most conservative dependency assumptions if it doesn't know how to > implement the partitioning strategy. For the consumer, the new interface > might look something like this: > > // Return the partition dependencies following an epoch bump > Map<Integer, List<Integer>> dependencies(int numPartitionsBeforeEpochBump, > int numPartitionsAfterEpochBump) > > The unordered case then is just a particular implementation which never has > any epoch dependencies. To implement this, we would need some way for the > consumer to find out how many partitions there were in each epoch, but > maybe that's not too unreasonable. > > Thanks, > Jason > > > On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com> > wrote: > > > Hi Dong > > > > thank you very much for your questions. > > > > regarding the time spend copying data across: > > It is correct that copying data from a topic with one partition mapping > to > > a topic with a different partition mapping takes way longer than we can > > stop producers. Tens of minutes is a very optimistic estimate here. Many > > people can not afford copy full steam and therefore will have some rate > > limiting in place, this can bump the timespan into the day's. The good > part > > is that the vast majority of the data can be copied while the producers > are > > still going. One can then, piggyback the consumers ontop of this > timeframe, > > by the method mentioned (provide them an mapping from their old offsets > to > > new offsets in their repartitioned topics. In that way we separate > > migration of consumers from migration of producers (decoupling these is > > what kafka is strongest at). The time to actually swap over the producers > > should be kept minimal by ensuring that when a swap attempt is started > the > > consumer copying over should be very close to the log end and is expected > > to finish within the next fetch. The operation should have a time-out and > > should be "reattemtable". > > > > Importance of logcompaction: > > If a producer produces key A, to partiton 0, its forever gonna be there, > > unless it gets deleted. The record might sit in there for years. A new > > producer started with the new partitions will fail to delete the record > in > > the correct partition. Th record will be there forever and one can not > > reliable bootstrap new consumers. I cannot see how linear hashing can > solve > > this. > > > > Regarding your skipping of userland copying: > > 100%, copying the data across in userland is, as far as i can see, only a > > usecase for log compacted topics. Even for logcompaction + retentions it > > should only be opt-in. Why did I bring it up? I think log compaction is a > > very important feature to really embrace kafka as a "data plattform". The > > point I also want to make is that copying data this way is completely > > inline with the kafka architecture. it only consists of reading and > writing > > to topics. > > > > I hope it clarifies more why I think we should aim for more than the > > current KIP. I fear that once the KIP is done not much more effort will > be > > taken. > > > > > > > > > > On 04.03.2018 02:28, Dong Lin wrote: > > > >> Hey Jan, > >> > >> In the current proposal, the consumer will be blocked on waiting for > other > >> consumers of the group to consume up to a given offset. In most cases, > all > >> consumers should be close to the LEO of the partitions when the > partition > >> expansion happens. Thus the time waiting should not be long e.g. on the > >> order of seconds. On the other hand, it may take a long time to wait for > >> the entire partition to be copied -- the amount of time is proportional > to > >> the amount of existing data in the partition, which can take tens of > >> minutes. So the amount of time that we stop consumers may not be on the > >> same order of magnitude. > >> > >> If we can implement this suggestion without copying data over in purse > >> userland, it will be much more valuable. Do you have ideas on how this > can > >> be done? > >> > >> Not sure why the current KIP not help people who depend on log > compaction. > >> Could you elaborate more on this point? > >> > >> Thanks, > >> Dong > >> > >> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<jan.filip...@trivago.com > > > >> wrote: > >> > >> Hi Dong, > >>> > >>> I tried to focus on what the steps are one can currently perform to > >>> expand > >>> or shrink a keyed topic while maintaining a top notch semantics. > >>> I can understand that there might be confusion about "stopping the > >>> consumer". It is exactly the same as proposed in the KIP. there needs > to > >>> be > >>> a time the producers agree on the new partitioning. The extra > semantics I > >>> want to put in there is that we have a possibility to wait until all > the > >>> existing data > >>> is copied over into the new partitioning scheme. When I say stopping I > >>> think more of having a memory barrier that ensures the ordering. I am > >>> still > >>> aming for latencies on the scale of leader failovers. > >>> > >>> Consumers have to explicitly adapt the new partitioning scheme in the > >>> above scenario. The reason is that in these cases where you are > dependent > >>> on a particular partitioning scheme, you also have other topics that > have > >>> co-partition enforcements or the kind -frequently. Therefore all your > >>> other > >>> input topics might need to grow accordingly. > >>> > >>> > >>> What I was suggesting was to streamline all these operations as best as > >>> possible to have "real" partition grow and shrinkage going on. > Migrating > >>> the producers to a new partitioning scheme can be much more streamlined > >>> with proper broker support for this. Migrating consumer is a step that > >>> might be made completly unnecessary if - for example streams - takes > the > >>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect > consumers > >>> and other consumers should be fine anyways. > >>> > >>> I hope this makes more clear where I was aiming at. The rest needs to > be > >>> figured out. The only danger i see is that when we are introducing this > >>> feature as supposed in the KIP, it wont help any people depending on > log > >>> compaction. > >>> > >>> The other thing I wanted to mention is that I believe the current > >>> suggestion (without copying data over) can be implemented in pure > >>> userland > >>> with a custom partitioner and a small feedbackloop from ProduceResponse > >>> => > >>> Partitionier in coorporation with a change management system. > >>> > >>> Best Jan > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> On 28.02.2018 07:13, Dong Lin wrote: > >>> > >>> Hey Jan, > >>>> > >>>> I am not sure if it is acceptable for producer to be stopped for a > >>>> while, > >>>> particularly for online application which requires low latency. I am > >>>> also > >>>> not sure how consumers can switch to a new topic. Does user > application > >>>> needs to explicitly specify a different topic for producer/consumer to > >>>> subscribe to? It will be helpful for discussion if you can provide > more > >>>> detail on the interface change for this solution. > >>>> > >>>> Thanks, > >>>> Dong > >>>> > >>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan Filipiak<Jan.Filipiak@trivago. > com > >>>> > > >>>> wrote: > >>>> > >>>> Hi, > >>>> > >>>>> just want to throw my though in. In general the functionality is very > >>>>> usefull, we should though not try to find the architecture to hard > >>>>> while > >>>>> implementing. > >>>>> > >>>>> The manual steps would be to > >>>>> > >>>>> create a new topic > >>>>> the mirrormake from the new old topic to the new topic > >>>>> wait for mirror making to catch up. > >>>>> then put the consumers onto the new topic > >>>>> (having mirrormaker spit out a mapping from old offsets to new > >>>>> offsets: > >>>>> if topic is increased by factor X there is gonna be a clean > >>>>> mapping from 1 offset in the old topic to X offsets in the new topic, > >>>>> if there is no factor then there is no chance to generate a > >>>>> mapping that can be reasonable used for continuing) > >>>>> make consumers stop at appropriate points and continue > >>>>> consumption > >>>>> with offsets from the mapping. > >>>>> have the producers stop for a minimal time. > >>>>> wait for mirrormaker to finish > >>>>> let producer produce with the new metadata. > >>>>> > >>>>> > >>>>> Instead of implementing the approach suggest in the KIP which will > >>>>> leave > >>>>> log compacted topic completely crumbled and unusable. > >>>>> I would much rather try to build infrastructure to support the > >>>>> mentioned > >>>>> above operations more smoothly. > >>>>> Especially having producers stop and use another topic is difficult > and > >>>>> it would be nice if one can trigger "invalid metadata" exceptions for > >>>>> them > >>>>> and > >>>>> if one could give topics aliases so that their produces with the old > >>>>> topic > >>>>> will arrive in the new topic. > >>>>> > >>>>> The downsides are obvious I guess ( having the same data twice for > the > >>>>> transition period, but kafka tends to scale well with datasize). So > >>>>> its a > >>>>> nicer fit into the architecture. > >>>>> > >>>>> I further want to argument that the functionality by the KIP can > >>>>> completely be implementing in "userland" with a custom partitioner > that > >>>>> handles the transition as needed. I would appreciate if someone could > >>>>> point > >>>>> out what a custom partitioner couldn't handle in this case? > >>>>> > >>>>> With the above approach, shrinking a topic becomes the same steps. > >>>>> Without > >>>>> loosing keys in the discontinued partitions. > >>>>> > >>>>> Would love to hear what everyone thinks. > >>>>> > >>>>> Best Jan > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> On 11.02.2018 00:35, Dong Lin wrote: > >>>>> > >>>>> Hi all, > >>>>> > >>>>>> I have created KIP-253: Support in-order message delivery with > >>>>>> partition > >>>>>> expansion. See > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253% > >>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion > >>>>>> . > >>>>>> > >>>>>> This KIP provides a way to allow messages of the same key from the > >>>>>> same > >>>>>> producer to be consumed in the same order they are produced even if > we > >>>>>> expand partition of the topic. > >>>>>> > >>>>>> Thanks, > >>>>>> Dong > >>>>>> > >>>>>> > >>>>>> > >>>>>> > > >