Hi, Matthis,

My understanding is that in KStream, the only case when a changelog topic
needs to be compacted is when the corresponding input is a KTable. In all
other cases, the changelog topics are of compacted + deletion. So, if most
KTables are not high volume, there may not be a need to expand its
partitions and therefore the partitions of the corresponding changelog
topic.

Thanks,

Jun

On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Jun,
>
> thanks for your comment. This should actually work for Streams, because
> we don't rely on producer "hashing" but specify the partition number
> explicitly on send().
>
> About not allowing to change the number of partition for changelog
> topics: for Streams, this seems to imply that we need to create a second
> changelog topic for each store with the new partition count. However, it
> would be unclear when/if we can delete the old topic. Thus, it moves the
> "problem" into the application layer. It's hard to judge for me atm what
> the impact would be, but it's something we should pay attention to.
>
>
> -Matthias
>
> On 3/6/18 3:45 PM, Jun Rao wrote:
> > Hi, Mattias,
> >
> > Regarding your comment "If it would be time-delay based, it might be
> > problematic
> > for Kafka Streams: if we get the information that the new input
> partitions
> > are available for producing, we need to enable the new changelog
> partitions
> > for producing, too. If those would not be available yet, because the
> > time-delay did not trigger yet, it would be problematic to avoid
> > crashing.", could you just enable the changelog topic to write to its new
> > partitions immediately?  The input topic can be configured with a delay
> in
> > writing to the new partitions. Initially, there won't be new data
> produced
> > into the newly added partitions in the input topic. However, we could
> > prebuild the state for the new input partition and write the state
> changes
> > to the corresponding new partitions in the changelog topic.
> >
> > Hi, Jan,
> >
> > For a compacted topic, garbage collecting the old keys in the existing
> > partitions after partition expansion can be tricky as your pointed out. A
> > few options here. (a) Let brokers exchange keys across brokers during
> > compaction. This will add complexity on the broker side. (b) Build an
> > external tool that scans the compacted topic and drop the prefix of a
> > partition if all records in the prefix are removable. The admin can then
> > run this tool when the unneeded space needs to be reclaimed. (c) Don't
> > support partition change in a compacted topic. This might be ok since
> most
> > compacted topics are not high volume.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin <lindon...@gmail.com> wrote:
> >
> >> Hi everyone,
> >>
> >> Thanks for all the comments! It appears that everyone prefers linear
> >> hashing because it reduces the amount of state that needs to be moved
> >> between consumers (for stream processing). The KIP has been updated to
> use
> >> linear hashing.
> >>
> >> Regarding the migration endeavor: it seems that migrating producer
> library
> >> to use linear hashing should be pretty straightforward without
> >> much operational endeavor. If we don't upgrade client library to use
> this
> >> KIP, we can not support in-order delivery after partition is changed
> >> anyway. Suppose we upgrade client library to use this KIP, if partition
> >> number is not changed, the key -> partition mapping will be exactly the
> >> same as it is now because it is still determined using murmur_hash(key)
> %
> >> original_partition_num. In other words, this change is backward
> compatible.
> >>
> >> Regarding the load distribution: if we use linear hashing, the load may
> be
> >> unevenly distributed because those partitions which are not split may
> >> receive twice as much traffic as other partitions that are split. This
> >> issue can be mitigated by creating topic with partitions that are
> several
> >> times the number of consumers. And there will be no imbalance if the
> >> partition number is always doubled. So this imbalance seems acceptable.
> >>
> >> Regarding storing the partition strategy as per-topic config: It seems
> not
> >> necessary since we can still use murmur_hash as the default hash
> function
> >> and additionally apply the linear hashing algorithm if the partition
> number
> >> has increased. Not sure if there is any use-case for producer to use a
> >> different hash function. Jason, can you check if there is some use-case
> >> that I missed for using the per-topic partition strategy?
> >>
> >> Regarding how to reduce latency (due to state store/load) in stream
> >> processing consumer when partition number changes: I need to read the
> Kafka
> >> Stream code to understand how Kafka Stream currently migrate state
> between
> >> consumers when the application is added/removed for a given job. I will
> >> reply after I finish reading the documentation and code.
> >>
> >>
> >> Thanks,
> >> Dong
> >>
> >>
> >> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson <ja...@confluent.io>
> >> wrote:
> >>
> >>> Great discussion. I think I'm wondering whether we can continue to
> leave
> >>> Kafka agnostic to the partitioning strategy. The challenge is
> >> communicating
> >>> the partitioning logic from producers to consumers so that the
> >> dependencies
> >>> between each epoch can be determined. For the sake of discussion,
> imagine
> >>> you did something like the following:
> >>>
> >>> 1. The name (and perhaps version) of a partitioning strategy is stored
> in
> >>> topic configuration when a topic is created.
> >>> 2. The producer looks up the partitioning strategy before writing to a
> >>> topic and includes it in the produce request (for fencing). If it
> doesn't
> >>> have an implementation for the configured strategy, it fails.
> >>> 3. The consumer also looks up the partitioning strategy and uses it to
> >>> determine dependencies when reading a new epoch. It could either fail
> or
> >>> make the most conservative dependency assumptions if it doesn't know
> how
> >> to
> >>> implement the partitioning strategy. For the consumer, the new
> interface
> >>> might look something like this:
> >>>
> >>> // Return the partition dependencies following an epoch bump
> >>> Map<Integer, List<Integer>> dependencies(int
> >> numPartitionsBeforeEpochBump,
> >>> int numPartitionsAfterEpochBump)
> >>>
> >>> The unordered case then is just a particular implementation which never
> >> has
> >>> any epoch dependencies. To implement this, we would need some way for
> the
> >>> consumer to find out how many partitions there were in each epoch, but
> >>> maybe that's not too unreasonable.
> >>>
> >>> Thanks,
> >>> Jason
> >>>
> >>>
> >>> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak <jan.filip...@trivago.com
> >
> >>> wrote:
> >>>
> >>>> Hi Dong
> >>>>
> >>>> thank you very much for your questions.
> >>>>
> >>>> regarding the time spend copying data across:
> >>>> It is correct that copying data from a topic with one partition
> mapping
> >>> to
> >>>> a topic with a different partition mapping takes way longer than we
> can
> >>>> stop producers. Tens of minutes is a very optimistic estimate here.
> >> Many
> >>>> people can not afford copy full steam and therefore will have some
> rate
> >>>> limiting in place, this can bump the timespan into the day's. The good
> >>> part
> >>>> is that the vast majority of the data can be copied while the
> producers
> >>> are
> >>>> still going. One can then, piggyback the consumers ontop of this
> >>> timeframe,
> >>>> by the method mentioned (provide them an mapping from their old
> offsets
> >>> to
> >>>> new offsets in their repartitioned topics. In that way we separate
> >>>> migration of consumers from migration of producers (decoupling these
> is
> >>>> what kafka is strongest at). The time to actually swap over the
> >> producers
> >>>> should be kept minimal by ensuring that when a swap attempt is started
> >>> the
> >>>> consumer copying over should be very close to the log end and is
> >> expected
> >>>> to finish within the next fetch. The operation should have a time-out
> >> and
> >>>> should be "reattemtable".
> >>>>
> >>>> Importance of logcompaction:
> >>>> If a producer produces key A, to partiton 0, its forever gonna be
> >> there,
> >>>> unless it gets deleted. The record might sit in there for years. A new
> >>>> producer started with the new partitions will fail to delete the
> record
> >>> in
> >>>> the correct partition. Th record will be there forever and one can not
> >>>> reliable bootstrap new consumers. I cannot see how linear hashing can
> >>> solve
> >>>> this.
> >>>>
> >>>> Regarding your skipping of userland copying:
> >>>> 100%, copying the data across in userland is, as far as i can see,
> >> only a
> >>>> usecase for log compacted topics. Even for logcompaction + retentions
> >> it
> >>>> should only be opt-in. Why did I bring it up? I think log compaction
> >> is a
> >>>> very important feature to really embrace kafka as a "data plattform".
> >> The
> >>>> point I also want to make is that copying data this way is completely
> >>>> inline with the kafka architecture. it only consists of reading and
> >>> writing
> >>>> to topics.
> >>>>
> >>>> I hope it clarifies more why I think we should aim for more than the
> >>>> current KIP. I fear that once the KIP is done not much more effort
> will
> >>> be
> >>>> taken.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On 04.03.2018 02:28, Dong Lin wrote:
> >>>>
> >>>>> Hey Jan,
> >>>>>
> >>>>> In the current proposal, the consumer will be blocked on waiting for
> >>> other
> >>>>> consumers of the group to consume up to a given offset. In most
> cases,
> >>> all
> >>>>> consumers should be close to the LEO of the partitions when the
> >>> partition
> >>>>> expansion happens. Thus the time waiting should not be long e.g. on
> >> the
> >>>>> order of seconds. On the other hand, it may take a long time to wait
> >> for
> >>>>> the entire partition to be copied -- the amount of time is
> >> proportional
> >>> to
> >>>>> the amount of existing data in the partition, which can take tens of
> >>>>> minutes. So the amount of time that we stop consumers may not be on
> >> the
> >>>>> same order of magnitude.
> >>>>>
> >>>>> If we can implement this suggestion without copying data over in
> purse
> >>>>> userland, it will be much more valuable. Do you have ideas on how
> this
> >>> can
> >>>>> be done?
> >>>>>
> >>>>> Not sure why the current KIP not help people who depend on log
> >>> compaction.
> >>>>> Could you elaborate more on this point?
> >>>>>
> >>>>> Thanks,
> >>>>> Dong
> >>>>>
> >>>>> On Wed, Feb 28, 2018 at 10:55 PM, Jan Filipiak<Jan.Filipiak@trivago.
> >> com
> >>>>
> >>>>> wrote:
> >>>>>
> >>>>> Hi Dong,
> >>>>>>
> >>>>>> I tried to focus on what the steps are one can currently perform to
> >>>>>> expand
> >>>>>> or shrink a keyed topic while maintaining a top notch semantics.
> >>>>>> I can understand that there might be confusion about "stopping the
> >>>>>> consumer". It is exactly the same as proposed in the KIP. there
> needs
> >>> to
> >>>>>> be
> >>>>>> a time the producers agree on the new partitioning. The extra
> >>> semantics I
> >>>>>> want to put in there is that we have a possibility to wait until all
> >>> the
> >>>>>> existing data
> >>>>>> is copied over into the new partitioning scheme. When I say stopping
> >> I
> >>>>>> think more of having a memory barrier that ensures the ordering. I
> am
> >>>>>> still
> >>>>>> aming for latencies  on the scale of leader failovers.
> >>>>>>
> >>>>>> Consumers have to explicitly adapt the new partitioning scheme in
> the
> >>>>>> above scenario. The reason is that in these cases where you are
> >>> dependent
> >>>>>> on a particular partitioning scheme, you also have other topics that
> >>> have
> >>>>>> co-partition enforcements or the kind -frequently. Therefore all
> your
> >>>>>> other
> >>>>>> input topics might need to grow accordingly.
> >>>>>>
> >>>>>>
> >>>>>> What I was suggesting was to streamline all these operations as best
> >> as
> >>>>>> possible to have "real" partition grow and shrinkage going on.
> >>> Migrating
> >>>>>> the producers to a new partitioning scheme can be much more
> >> streamlined
> >>>>>> with proper broker support for this. Migrating consumer is a step
> >> that
> >>>>>> might be made completly unnecessary if - for example streams - takes
> >>> the
> >>>>>> gcd as partitioning scheme instead of enforcing 1 to 1. Connect
> >>> consumers
> >>>>>> and other consumers should be fine anyways.
> >>>>>>
> >>>>>> I hope this makes more clear where I was aiming at. The rest needs
> to
> >>> be
> >>>>>> figured out. The only danger i see is that when we are introducing
> >> this
> >>>>>> feature as supposed in the KIP, it wont help any people depending on
> >>> log
> >>>>>> compaction.
> >>>>>>
> >>>>>> The other thing I wanted to mention is that I believe the current
> >>>>>> suggestion (without copying data over) can be implemented in pure
> >>>>>> userland
> >>>>>> with a custom partitioner and a small feedbackloop from
> >> ProduceResponse
> >>>>>> =>
> >>>>>> Partitionier in coorporation with a change management system.
> >>>>>>
> >>>>>> Best Jan
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On 28.02.2018 07:13, Dong Lin wrote:
> >>>>>>
> >>>>>> Hey Jan,
> >>>>>>>
> >>>>>>> I am not sure if it is acceptable for producer to be stopped for a
> >>>>>>> while,
> >>>>>>> particularly for online application which requires low latency. I
> am
> >>>>>>> also
> >>>>>>> not sure how consumers can switch to a new topic. Does user
> >>> application
> >>>>>>> needs to explicitly specify a different topic for producer/consumer
> >> to
> >>>>>>> subscribe to? It will be helpful for discussion if you can provide
> >>> more
> >>>>>>> detail on the interface change for this solution.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Dong
> >>>>>>>
> >>>>>>> On Mon, Feb 26, 2018 at 12:48 AM, Jan
> Filipiak<Jan.Filipiak@trivago.
> >>> com
> >>>>>>>>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>>> just want to throw my though in. In general the functionality is
> >> very
> >>>>>>>> usefull, we should though not try to find the architecture to hard
> >>>>>>>> while
> >>>>>>>> implementing.
> >>>>>>>>
> >>>>>>>> The manual steps would be to
> >>>>>>>>
> >>>>>>>> create a new topic
> >>>>>>>> the mirrormake from the new old topic to the new topic
> >>>>>>>> wait for mirror making to catch up.
> >>>>>>>> then put the consumers onto the new topic
> >>>>>>>>       (having mirrormaker spit out a mapping from old offsets to
> >> new
> >>>>>>>> offsets:
> >>>>>>>>           if topic is increased by factor X there is gonna be a
> >> clean
> >>>>>>>> mapping from 1 offset in the old topic to X offsets in the new
> >> topic,
> >>>>>>>>           if there is no factor then there is no chance to
> >> generate a
> >>>>>>>> mapping that can be reasonable used for continuing)
> >>>>>>>>       make consumers stop at appropriate points and continue
> >>>>>>>> consumption
> >>>>>>>> with offsets from the mapping.
> >>>>>>>> have the producers stop for a minimal time.
> >>>>>>>> wait for mirrormaker to finish
> >>>>>>>> let producer produce with the new metadata.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Instead of implementing the approach suggest in the KIP which will
> >>>>>>>> leave
> >>>>>>>> log compacted topic completely crumbled and unusable.
> >>>>>>>> I would much rather try to build infrastructure to support the
> >>>>>>>> mentioned
> >>>>>>>> above operations more smoothly.
> >>>>>>>> Especially having producers stop and use another topic is
> difficult
> >>> and
> >>>>>>>> it would be nice if one can trigger "invalid metadata" exceptions
> >> for
> >>>>>>>> them
> >>>>>>>> and
> >>>>>>>> if one could give topics aliases so that their produces with the
> >> old
> >>>>>>>> topic
> >>>>>>>> will arrive in the new topic.
> >>>>>>>>
> >>>>>>>> The downsides are obvious I guess ( having the same data twice for
> >>> the
> >>>>>>>> transition period, but kafka tends to scale well with datasize).
> So
> >>>>>>>> its a
> >>>>>>>> nicer fit into the architecture.
> >>>>>>>>
> >>>>>>>> I further want to argument that the functionality by the KIP can
> >>>>>>>> completely be implementing in "userland" with a custom partitioner
> >>> that
> >>>>>>>> handles the transition as needed. I would appreciate if someone
> >> could
> >>>>>>>> point
> >>>>>>>> out what a custom partitioner couldn't handle in this case?
> >>>>>>>>
> >>>>>>>> With the above approach, shrinking a topic becomes the same steps.
> >>>>>>>> Without
> >>>>>>>> loosing keys in the discontinued partitions.
> >>>>>>>>
> >>>>>>>> Would love to hear what everyone thinks.
> >>>>>>>>
> >>>>>>>> Best Jan
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 11.02.2018 00:35, Dong Lin wrote:
> >>>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>>> I have created KIP-253: Support in-order message delivery with
> >>>>>>>>> partition
> >>>>>>>>> expansion. See
> >>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-253%
> >>>>>>>>> 3A+Support+in-order+message+delivery+with+partition+expansion
> >>>>>>>>> .
> >>>>>>>>>
> >>>>>>>>> This KIP provides a way to allow messages of the same key from
> the
> >>>>>>>>> same
> >>>>>>>>> producer to be consumed in the same order they are produced even
> >> if
> >>> we
> >>>>>>>>> expand partition of the topic.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Dong
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to