Hi, Matthis, My understanding is that in KStream, the only case when a changelog topic needs to be compacted is when the corresponding input is a KTable. In all other cases, the changelog topics are of compacted + deletion. So, if most KTables are not high volume, there may not be a need to expand its partitions and therefore the partitions of the corresponding changelog topic.
Thanks, Jun On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Jun, > > thanks for your comment. This should actually work for Streams, because > we don't rely on producer "hashing" but specify the partition number > explicitly on send(). > > About not allowing to change the number of partition for changelog > topics: for Streams, this seems to imply that we need to create a second > changelog topic for each store with the new partition count. However, it > would be unclear when/if we can delete the old topic. Thus, it moves the > "problem" into the application layer. It's hard to judge for me atm what > the impact would be, but it's something we should pay attention to. > > > -Matthias > > On 3/6/18 3:45 PM, Jun Rao wrote: > > Hi, Mattias, > > > > Regarding your comment "If it would be time-delay based, it might be > > problematic > > for Kafka Streams: if we get the information that the new input > partitions > > are available for producing, we need to enable the new changelog > partitions > > for producing, too. If those would not be available yet, because the > > time-delay did not trigger yet, it would be problematic to avoid > > crashing.", could you just enable the changelog topic to write to its new > > partitions immediately? The input topic can be configured with a delay > in > > writing to the new partitions. Initially, there won't be new data > produced > > into the newly added partitions in the input topic. However, we could > > prebuild the state for the new input partition and write the state > changes > > to the corresponding new partitions in the changelog topic. > > > > Hi, Jan, > > > > For a compacted topic, garbage collecting the old keys in the existing > > partitions after partition expansion can be tricky as your pointed out. A > > few options here. (a) Let brokers exchange keys across brokers during > > compaction. This will add complexity on the broker side. (b) Build an > > external tool that scans the compacted topic and drop the prefix of a > > partition if all records in the prefix are removable. The admin can then > > run this tool when the unneeded space needs to be reclaimed. (c) Don't > > support partition change in a compacted topic. This might be ok since > most > > compacted topics are not high volume. > > > > Thanks, > > > > Jun > > > > > > On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote: > > > >> Hi everyone, > >> > >> Thanks for all the comments! It appears that everyone prefers linear > >> hashing because it reduces the amount of state that needs to be moved > >> between consumers (for stream processing). The KIP has been updated to > use > >> linear hashing. > >> > >> Regarding the migration endeavor: it seems that migrating producer > library > >> to use linear hashing should be pretty straightforward without > >> much operational endeavor. If we don't upgrade client library to use > this > >> KIP, we can not support in-order delivery after partition is changed > >> anyway. Suppose we upgrade client library to use this KIP, if partition > >> number is not changed, the key -> partition mapping will be exactly the > >> same as it is now because it is still determined using murmur_hash(key) > % > >> original_partition_num. In other words, this change is backward > compatible. > >> > >> Regarding the load distribution: if we use linear hashing, the load may > be > >> unevenly distributed because those partitions which are not split may > >> receive twice as much traffic as other partitions that are split. This > >> issue can be mitigated by creating topic with partitions that are > several > >> times the number of consumers. And there will be no imbalance if the > >> partition number is always doubled. So this imbalance seems acceptable. > >> > >> Regarding storing the partition strategy as per-topic config: It seems > not > >> necessary since we can still use murmur_hash as the default hash > function > >> and additionally apply the linear hashing algorithm if the partition > number > >> has increased. Not sure if there is any use-case for producer to use a > >> different hash function. Jason, can you check if there is some use-case > >> that I missed for using the per-topic partition strategy? > >> > >> Regarding how to reduce latency (due to state store/load) in stream > >> processing consumer when partition number changes: I need to read the > Kafka > >> Stream code to understand how Kafka Stream currently migrate state > between > >> consumers when the application is added/removed for a given job. I will > >> reply after I finish reading the documentation and code. > >> > >> > >> Thanks, > >> Dong > >> > >> > >> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io> > >> wrote: > >> > >>> Great discussion. I think I'm wondering whether we can continue to > leave > >>> Kafka agnostic to the partitioning strategy. The challenge is > >> communicating > >>> the partitioning logic from producers to consumers so that the > >> dependencies > >>> between each epoch can be determined. For the sake of discussion, > imagine > >>> you did something like the following: > >>> > >>> 1. The name (and perhaps version) of a partitioning strategy is stored > in > >>> topic configuration when a topic is created. > >>> 2. The producer looks up the partitioning strategy before writing to a > >>> topic and includes it in the produce request (for fencing). If it > doesn't > >>> have an implementation for the configured strategy, it fails. > >>> 3. The consumer also looks up the partitioning strategy and uses it to > >>> determine dependencies when reading a new epoch. It could either fail > or > >>> make the most conservative dependency assumptions if it doesn't know > how > >> to > >>> implement the partitioning strategy. For the consumer, the new > interface > >>> might look something like this: > >>> > >>> // Return the partition dependencies following an epoch bump > >>> Map<Integer, List<Integer>> dependencies(int > >> numPartitionsBeforeEpochBump, > >>> int numPartitionsAfterEpochBump) > >>> > >>> The unordered case then is just a particular implementation which never > >> has > >>> any epoch dependencies. To implement this, we would need some way for > the > >>> consumer to find out how many partitions there were in each epoch, but > >>> maybe that's not too unreasonable. > >>> > >>> Thanks, > >>> Jason > >>> > >>> > >>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com > > > >>> wrote: > >>> > >>>> Hi Dong > >>>> > >>>> thank you very much for your questions. > >>>> > >>>> regarding the time spend copying data across: > >>>> It is correct that copying data from a topic with one partition > mapping > >>> to > >>>> a topic with a different partition mapping takes way longer than we > can > >>>> stop producers. Tens of minutes is a very optimistic estimate here. > >> Many > >>>> people can not afford copy full steam and therefore will have some > rate > >>>> limiting in place, this can bump the timespan into the day's. The good > >>> part > >>>> is that the vast majority of the data can be copied while the > producers > >>> are > >>>> still going. One can then, piggyback the consumers ontop of this > >>> timeframe, > >>>> by the method mentioned (provide them an mapping from their old > offsets > >>> to > >>>> new offsets in their repartitioned topics. In that way we separate > >>>> migration of consumers from migration of producers (decoupling these > is > >>>> what kafka is strongest at). The time to actually swap over the > >> producers > >>>> should be kept minimal by ensuring that when a swap attempt is started > >>> the > >>>> consumer copying over should be very close to the log end and is > >> expected > >>>> to finish within the next fetch. The operation should have a time-out > >> and > >>>> should be "reattemtable". > >>>> > >>>> Importance of logcompaction: > >>>> If a producer produces key A, to partiton 0, its forever gonna be > >> there, > >>>> unless it gets deleted. The record might sit in there for years. A new > >>>> producer started with the new partitions will fail to delete the > record > >>> in > >>>> the correct partition. Th record will be there forever and one can not > >>>> reliable bootstrap new consumers. I cannot see how linear hashing can > >>> solve > >>>> this. > >>>> > >>>> Regarding your skipping of userland copying: > >>>> 100%, copying the data across in userland is, as far as i can see, > >> only a > >>>> usecase for log compacted topics. Even for logcompaction + retentions > >> it > >>>> should only be opt-in. Why did I bring it up? I think log compaction > >> is a > >>>> very important feature to really embrace kafka as a "data plattform". > >> The > >>>> point I also want to make is that copying data this way is completely > >>>> inline with the kafka architecture. it only consists of reading and > >>> writing > >>>> to topics. > >>>> > >>>> I hope it clarifies more why I think we should aim for more than the > >>>> current KIP. I fear that once the KIP is done not much more effort > will > >>> be > >>>> taken. > >>>> > >>>> > >>>> > >>>> > >>>> On 04.03.2018 02:28, Dong Lin wrote: > >>>> > >>>>> Hey Jan, > >>>>> > >>>>> In the current proposal, the consumer will be blocked on waiting for > >>> other > >>>>> consumers of the group to consume up to a given offset. In most > cases, > >>> all > >>>>> consumers should be close to the LEO of the partitions when the > >>> partition > >>>>> expansion happens. Thus the time waiting should not be long e.g. on > >> the > >>>>> order of seconds. On the other hand, it may take a long time to wait > >> for > >>>>> the entire partition to be copied -- the amount of time is > >> proportional > >>> to > >>>>> the amount of existing data in the partition, which can take tens of > >>>>> minutes. So the amount of time that we stop consumers may not be on > >> the > >>>>> same order of magnitude. > >>>>> > >>>>> If we can implement this suggestion without copying data over in > purse > >>>>> userland, it will be much more valuable. Do you have ideas on how > this > >>> can > >>>>> be done? > >>>>> > >>>>> Not sure why the current KIP not help people who depend on log > >>> compaction. > >>>>> Could you elaborate more on this point? > >>>>> > >>>>> Thanks, > >>>>> Dong > >>>>> > >>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago. > >> com > >>>> > >>>>> wrote: > >>>>> > >>>>> Hi Dong, > >>>>>> > >>>>>> I tried to focus on what the steps are one can currently perform to > >>>>>> expand > >>>>>> or shrink a keyed topic while maintaining a top notch semantics. > >>>>>> I can understand that there might be confusion about "stopping the > >>>>>> consumer". It is exactly the same as proposed in the KIP. there > needs > >>> to > >>>>>> be > >>>>>> a time the producers agree on the new partitioning. The extra > >>> semantics I > >>>>>> want to put in there is that we have a possibility to wait until all > >>> the > >>>>>> existing data > >>>>>> is copied over into the new partitioning scheme. When I say stopping > >> I > >>>>>> think more of having a memory barrier that ensures the ordering. I > am > >>>>>> still > >>>>>> aming for latencies on the scale of leader failovers. > >>>>>> > >>>>>> Consumers have to explicitly adapt the new partitioning scheme in > the > >>>>>> above scenario. The reason is that in these cases where you are > >>> dependent > >>>>>> on a particular partitioning scheme, you also have other topics that > >>> have > >>>>>> co-partition enforcements or the kind -frequently. Therefore all > your > >>>>>> other > >>>>>> input topics might need to grow accordingly. > >>>>>> > >>>>>> > >>>>>> What I was suggesting was to streamline all these operations as best > >> as > >>>>>> possible to have "real" partition grow and shrinkage going on. > >>> Migrating > >>>>>> the producers to a new partitioning scheme can be much more > >> streamlined > >>>>>> with proper broker support for this. Migrating consumer is a step > >> that > >>>>>> might be made completly unnecessary if - for example streams - takes > >>> the > >>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect > >>> consumers > >>>>>> and other consumers should be fine anyways. > >>>>>> > >>>>>> I hope this makes more clear where I was aiming at. The rest needs > to > >>> be > >>>>>> figured out. The only danger i see is that when we are introducing > >> this > >>>>>> feature as supposed in the KIP, it wont help any people depending on > >>> log > >>>>>> compaction. > >>>>>> > >>>>>> The other thing I wanted to mention is that I believe the current > >>>>>> suggestion (without copying data over) can be implemented in pure > >>>>>> userland > >>>>>> with a custom partitioner and a small feedbackloop from > >> ProduceResponse > >>>>>> => > >>>>>> Partitionier in coorporation with a change management system. > >>>>>> > >>>>>> Best Jan > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On 28.02.2018 07:13, Dong Lin wrote: > >>>>>> > >>>>>> Hey Jan, > >>>>>>> > >>>>>>> I am not sure if it is acceptable for producer to be stopped for a > >>>>>>> while, > >>>>>>> particularly for online application which requires low latency. I > am > >>>>>>> also > >>>>>>> not sure how consumers can switch to a new topic. Does user > >>> application > >>>>>>> needs to explicitly specify a different topic for producer/consumer > >> to > >>>>>>> subscribe to? It will be helpful for discussion if you can provide > >>> more > >>>>>>> detail on the interface change for this solution. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Dong > >>>>>>> > >>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan > Filipiak<Jan.Filipiak@trivago. > >>> com > >>>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>>> just want to throw my though in. In general the functionality is > >> very > >>>>>>>> usefull, we should though not try to find the architecture to hard > >>>>>>>> while > >>>>>>>> implementing. > >>>>>>>> > >>>>>>>> The manual steps would be to > >>>>>>>> > >>>>>>>> create a new topic > >>>>>>>> the mirrormake from the new old topic to the new topic > >>>>>>>> wait for mirror making to catch up. > >>>>>>>> then put the consumers onto the new topic > >>>>>>>> (having mirrormaker spit out a mapping from old offsets to > >> new > >>>>>>>> offsets: > >>>>>>>> if topic is increased by factor X there is gonna be a > >> clean > >>>>>>>> mapping from 1 offset in the old topic to X offsets in the new > >> topic, > >>>>>>>> if there is no factor then there is no chance to > >> generate a > >>>>>>>> mapping that can be reasonable used for continuing) > >>>>>>>> make consumers stop at appropriate points and continue > >>>>>>>> consumption > >>>>>>>> with offsets from the mapping. > >>>>>>>> have the producers stop for a minimal time. > >>>>>>>> wait for mirrormaker to finish > >>>>>>>> let producer produce with the new metadata. > >>>>>>>> > >>>>>>>> > >>>>>>>> Instead of implementing the approach suggest in the KIP which will > >>>>>>>> leave > >>>>>>>> log compacted topic completely crumbled and unusable. > >>>>>>>> I would much rather try to build infrastructure to support the > >>>>>>>> mentioned > >>>>>>>> above operations more smoothly. > >>>>>>>> Especially having producers stop and use another topic is > difficult > >>> and > >>>>>>>> it would be nice if one can trigger "invalid metadata" exceptions > >> for > >>>>>>>> them > >>>>>>>> and > >>>>>>>> if one could give topics aliases so that their produces with the > >> old > >>>>>>>> topic > >>>>>>>> will arrive in the new topic. > >>>>>>>> > >>>>>>>> The downsides are obvious I guess ( having the same data twice for > >>> the > >>>>>>>> transition period, but kafka tends to scale well with datasize). > So > >>>>>>>> its a > >>>>>>>> nicer fit into the architecture. > >>>>>>>> > >>>>>>>> I further want to argument that the functionality by the KIP can > >>>>>>>> completely be implementing in "userland" with a custom partitioner > >>> that > >>>>>>>> handles the transition as needed. I would appreciate if someone > >> could > >>>>>>>> point > >>>>>>>> out what a custom partitioner couldn't handle in this case? > >>>>>>>> > >>>>>>>> With the above approach, shrinking a topic becomes the same steps. > >>>>>>>> Without > >>>>>>>> loosing keys in the discontinued partitions. > >>>>>>>> > >>>>>>>> Would love to hear what everyone thinks. > >>>>>>>> > >>>>>>>> Best Jan > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On 11.02.2018 00:35, Dong Lin wrote: > >>>>>>>> > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>>> I have created KIP-253: Support in-order message delivery with > >>>>>>>>> partition > >>>>>>>>> expansion. See > >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253% > >>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion > >>>>>>>>> . > >>>>>>>>> > >>>>>>>>> This KIP provides a way to allow messages of the same key from > the > >>>>>>>>> same > >>>>>>>>> producer to be consumed in the same order they are produced even > >> if > >>> we > >>>>>>>>> expand partition of the topic. > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> Dong > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>> > >>> > >> > > > >