Hi, Dong,

Thanks for the proposal. Looks good overall. A couple of comments.

1. Where is the low_watermark checkpointed? Is that
in replication-offset-checkpoint? If so, do we need to bump up the version?
Could you also describe the format change?

2. For topics with "delete" retention, currently we let each replica delete
old segments independently. With low_watermark, we could just let leaders
delete old segments through the deletion policy and the followers will
simply delete old segments based on low_watermark. Not sure if this saves
much, but is a potential option that may be worth thinking about.

Jun



On Wed, Jan 4, 2017 at 8:13 AM, radai <radai.rosenbl...@gmail.com> wrote:

> one more example of complicated config - mirror maker.
>
> we definitely cant trust each and every topic owner to configure their
> topics not to purge before they've been mirrored.
> which would mean there's a per-topic config (set by the owner) and a
> "global" config (where mirror makers are specified) and they need to be
> "merged".
> for those topics that _are_ mirrored.
> which is a changing set of topics thats stored in an external system
> outside of kafka.
> if a topic is taken out of the mirror set the MM offset would be "frozen"
> at that point and prevent clean-up for all eternity, unless its cleaned-up
> itself.
>
> ...
>
> complexity :-)
>
> On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenbl...@gmail.com> wrote:
>
> > in summary - i'm not opposed to the idea of a per-topic clean up config
> > that tracks some set of consumer groups' offsets (which would probably
> work
> > for 80% of use cases), but i definitely see a need to expose a simple API
> > for the more advanced/obscure/custom use cases (the other 20%).
> >
> > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> >> a major motivation for this KIP is cost savings.
> >>
> >> lots of internal systems at LI use kafka as an intermediate pipe, and
> set
> >> the topic retention period to a "safe enough" amount of time to be able
> to
> >> recover from crashes/downtime and catch up to "now". this results in a
> few
> >> days' worth of retention typically.
> >>
> >> however, under normal operating conditions the consumers are mostly
> >> caught-up and so early clean-up enables a big cost savings in storage.
> >>
> >> as for my points:
> >>
> >> 1. when discussing implementation options for automatic clean-up we
> >> realized that cleaning up by keeping track of offsets stored in kafka
> >> requires some per-topic config - you need to specify which groups to
> track.
> >> this becomes a problem because:
> >>     1.1 - relatively complicated code, to be written in the broker.
> >>     1.2 - configuration needs to be maintained up to date by topic
> >> "owners" - of which we have thousands. failure to do so would decrease
> the
> >> cost benefit.
> >>     1.3 - some applications have a "reconsume" / "reinit" / "bootstrap"
> >> workflow where they will reset their offsets to an earlier value than
> the
> >> one stored. this means that a stored offset of X does not always mean
> you
> >> can clean up to X-1. think of it as video encoding -some apps have "key
> >> frames" they may seek back to which are before their current offset.
> >>     1.4 - there are multiple possible strategies - you could clean up
> >> aggressively, retain some "time distance" from latest, some "offset
> >> distance", etc. this we think would have made it very hard to agree on a
> >> single "correct" implementation that everyone would be happy with. it
> would
> >> be better to include the raw functionality in the API and leave the
> >> "brains" to an external monitoring system where people could
> custom-taylor
> >> their logic
> >>
> >> 2. ad-hoc consumer groups: its common practice for devs to spin up
> >> console consumers and connect to a topic as a debug aid. SREs may also
> do
> >> this. there are also various other eco-system applications that may
> >> consumer from topics (unknown to topic owners as those are infra
> monitoring
> >> tools). obviously such consumer-groups' offsets should be ignored for
> >> purposes of clean-up, but coming up with a bullet-proof way to do this
> is
> >> non-trivial and again ties with implementation complexity and
> inflexibility
> >> of a "one size fits all" solution in 1.4 above.
> >>
> >> 3. forceful clean-up: we have workflows that use kafka to move gigantic
> >> blobs from offline hadoop processing flows into systems. the data being
> >> "loaded" into such an online system can be several GBs in side and take
> a
> >> long time to consume (they are sliced into many small msgs). sometimes
> the
> >> sender wants to abort and start a new blob before the current load
> process
> >> has completed - meaning the consumer's offsets are not yet caught up.
> >>
> >> 4. offsets outside of kafka: yes, you could force applications to store
> >> their offsets twice, but thats inefficient. its better to expose a raw,
> >> simple API and let such applications manage their own clean-up logic
> (this
> >> again ties into 1.4 and no "one size fits all" solution)
> >>
> >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava <
> >>> e...@confluent.io>
> >>> wrote:
> >>>
> >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> >>> >
> >>> > > Hey Ewen,
> >>> > >
> >>> > > Thanks for the review. As Radai explained, it would be complex in
> >>> terms
> >>> > of
> >>> > > user configuration if we were to use committed offset to decide
> data
> >>> > > deletion. We need a way to specify which groups need to consume
> data
> >>> of
> >>> > > this partition. The broker will also need to consume the entire
> >>> offsets
> >>> > > topic in that approach which has some overhead. I don't think it is
> >>> that
> >>> > > hard to implement. But it will likely take more time to discuss
> that
> >>> > > approach due to the new config and the server side overhead.
> >>> > >
> >>> > > We choose to put this API in AdminClient because the API is more
> >>> like an
> >>> > > administrative operation (such as listGroups, deleteTopics) than a
> >>> > consumer
> >>> > > operation. It is not necessarily called by consumer only. For
> >>> example, we
> >>> > > can implement the "delete data before committed offset" approach by
> >>> > running
> >>> > > an external service which calls purgeDataBefore() API based on
> >>> committed
> >>> > > offset of consumer groups.
> >>> > >
> >>> > > I am not aware that AdminClient is not a public API. Suppose it is
> >>> not
> >>> > > public now, I assume we plan to make it public in the future as
> part
> >>> of
> >>> > > KIP-4. Are we not making it public because its interface is not
> >>> stable?
> >>> > If
> >>> > > so, can we just tag this new API as not stable in the code?
> >>> > >
> >>> >
> >>> >
> >>> > The AdminClient planned for KIP-4 is a new Java-based implementation.
> >>> It's
> >>> > definitely confusing that both will be (could be?) named AdminClient,
> >>> but
> >>> > we've kept the existing Scala AdminClient out of the public API and
> >>> have
> >>> > not required KIPs for changes to it.
> >>> >
> >>> > That said, I agree that if this type of API makes it into Kafka,
> >>> having a
> >>> > (new, Java-based) AdminClient method would definitely be a good idea.
> >>> An
> >>> > alternative path might be to have a Consumer-based implementation
> since
> >>> > that seems like a very intuitive, natural way to use the protocol. I
> >>> think
> >>> > optimizing for the expected use case would be a good idea.
> >>> >
> >>> > -Ewen
> >>> >
> >>> > Are you saying that the Scala AdminClient is not a public API and we
> >>> discourage addition of any new feature to this class?
> >>>
> >>> I still prefer to add it to AdminClient (Java version in the future and
> >>> Scala version in the short team) because I feel it belongs to admin
> >>> operation instead of KafkaConsumer interface. For example, if in the
> >>> future
> >>> we implement the "delete data before committed offset" strategy in an
> >>> external service, I feel it is a bit awkward if the service has to
> >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(
> ...)
> >>> to
> >>> purge data. In other words, our expected use-case doesn't necessarily
> >>> bind
> >>> this API with consumer.
> >>>
> >>> I am not strong on this issue. Let's see what other
> committers/developers
> >>> think about this.
> >>>
> >>>
> >>> >
> >>> > >
> >>> > > Thanks,
> >>> > > Dong
> >>> > >
> >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava <
> >>> e...@confluent.io
> >>> > >
> >>> > > wrote:
> >>> > >
> >>> > > > Dong,
> >>> > > >
> >>> > > > Looks like that's an internal link,
> >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%
> >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient
> >>> > > > is the right one.
> >>> > > >
> >>> > > > I have a question about one of the rejected alternatives:
> >>> > > >
> >>> > > > > Using committed offset instead of an extra API to trigger data
> >>> purge
> >>> > > > operation.
> >>> > > >
> >>> > > > The KIP says this would be more complicated to implement. Why is
> >>> that?
> >>> > I
> >>> > > > think brokers would have to consume the entire offsets topic, but
> >>> the
> >>> > > data
> >>> > > > stored in memory doesn't seem to change and applying this when
> >>> updated
> >>> > > > offsets are seen seems basically the same. It might also be
> >>> possible to
> >>> > > > make it work even with multiple consumer groups if that was
> desired
> >>> > > > (although that'd require tracking more data in memory) as a
> >>> > > generalization
> >>> > > > without requiring coordination between the consumer groups. Given
> >>> the
> >>> > > > motivation, I'm assuming this was considered unnecessary since
> this
> >>> > > > specifically targets intermediate stream processing topics.
> >>> > > >
> >>> > > > Another question is why expose this via AdminClient (which isn't
> >>> public
> >>> > > API
> >>> > > > afaik)? Why not, for example, expose it on the Consumer, which is
> >>> > > > presumably where you'd want access to it since the functionality
> >>> > depends
> >>> > > on
> >>> > > > the consumer actually having consumed the data?
> >>> > > >
> >>> > > > -Ewen
> >>> > > >
> >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <lindon...@gmail.com>
> >>> wrote:
> >>> > > >
> >>> > > > > Hi all,
> >>> > > > >
> >>> > > > > We created KIP-107 to propose addition of purgeDataBefore() API
> >>> in
> >>> > > > > AdminClient.
> >>> > > > >
> >>> > > > > Please find the KIP wiki in the link
> https://iwww.corp.linkedin.
> >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+
> >>> > > > design+proposal.
> >>> > > > > We
> >>> > > > > would love to hear your comments and suggestions.
> >>> > > > >
> >>> > > > > Thanks,
> >>> > > > > Dong
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>

Reply via email to