Becket,

I put together a commit with a timestamp-based deletion policy, at
https://github.com/apache/kafka/commit/2c51ae3cead99432ebf19f0303f8cc797723c939
Is this a small enough change that you'd be comfortable incorporating it
into your work KIP 32, or do I need to open a separate KIP?

Thanks,
Bill Warshaw

On Mon, Feb 1, 2016 at 12:35 PM, Becket Qin <becket....@gmail.com> wrote:

> Hi Bill,
>
> The PR is still under review. It might take some more time because it
> touches a bunch of files. You can watch KAFKA-3025 so once it gets closed
> you will get email notification.
> Looking forward to your tool.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Feb 1, 2016 at 6:54 AM, Bill Warshaw <bill.wars...@appian.com>
> wrote:
>
> > Becket,
> >
> > I took a look at KIP-32 and your PR for it.  This looks like something
> that
> > would be great to build off of; I'm envisioning a timestamp-based policy
> > where the client application sets a minimum timestamp, before which
> > everything can be deleted / compacted.  How far along is this pull
> request?
> >
> > Bill Warshaw
> >
> > On Fri, Jan 22, 2016 at 12:41 AM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > I agree with Guozhang that this seems better to be a separate tool.
> > >
> > > Also, I am wondering if KIP-32 can be used here. We can have a
> timestamp
> > > based compaction policy if needed, for example, keep any message whose
> > > timestamp is greater than (MaxTimestamp - 24 hours).
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Thu, Jan 21, 2016 at 4:35 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Bill,
> > > >
> > > > For your case since once the log is cleaned up to the given offset
> > > > watermark (or threshold, whatever the name is), future cleaning with
> > the
> > > > same watermark will effectively be a no-op, so I feel your scenario
> > will
> > > be
> > > > better fit as a one-time admin tool to cleanup the logs rather than
> > > > customizing the periodic cleaning policy. Does this sound reasonable
> to
> > > > you?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Wed, Jan 20, 2016 at 7:09 PM, Bill Warshaw <
> bill.wars...@appian.com
> > >
> > > > wrote:
> > > >
> > > > > For our particular use case, we would need to.  This proposal is
> > really
> > > > two
> > > > > separate pieces:  custom log compaction policy, and the ability to
> > set
> > > > > arbitrary key-value pairs in a Topic configuration.
> > > > >
> > > > > I believe that Kafka's current behavior of throwing errors when it
> > > > > encounters configuration keys that aren't defined is meant to help
> > > users
> > > > > not misconfigure their configuration files.  If that is the sole
> > > > motivation
> > > > > for it, I would propose adding a property namespace, and allow
> users
> > to
> > > > > configure arbitrary properties behind that particular namespace,
> > while
> > > > > still enforcing strict parsing for all other properties.
> > > > >
> > > > > On Wed, Jan 20, 2016 at 9:23 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > So do you need to periodically update the key-value pairs to
> > "advance
> > > > the
> > > > > > threshold for each topic"?
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Wed, Jan 20, 2016 at 5:51 PM, Bill Warshaw <
> > > bill.wars...@appian.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Compaction would be performed in the same manner as it is
> > > currently.
> > > > > > There
> > > > > > > is a predicate applied in the "shouldRetainMessage" function in
> > > > > > LogCleaner;
> > > > > > > ultimately we just want to be able to swap a custom
> > implementation
> > > of
> > > > > > that
> > > > > > > particular method in.  Nothing else in the compaction codepath
> > > would
> > > > > need
> > > > > > > to change.
> > > > > > >
> > > > > > > For advancing the "threshold transaction_id", ideally we would
> be
> > > > able
> > > > > to
> > > > > > > set arbitrary key-value pairs on the topic configuration.  We
> > have
> > > > > access
> > > > > > > to the topic configuration during log compaction, so a custom
> > > policy
> > > > > > class
> > > > > > > would also have access to that config, and could read anything
> we
> > > > > stored
> > > > > > in
> > > > > > > there.
> > > > > > >
> > > > > > > On Wed, Jan 20, 2016 at 8:14 PM, Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Bill,
> > > > > > > >
> > > > > > > > Just to clarify your use case, is your "log compaction"
> > executed
> > > > > > > manually,
> > > > > > > > or it is triggered periodically like the current log cleaning
> > > > by-key
> > > > > > > does?
> > > > > > > > If it is the latter case, how will you advance the "threshold
> > > > > > > > transaction_id" each time when it executes?
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jan 20, 2016 at 1:50 PM, Bill Warshaw <
> > > > > bill.wars...@appian.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Damian, I appreciate your quick response.
> > > > > > > > >
> > > > > > > > > Our transaction_id is incrementing for each transaction, so
> > we
> > > > will
> > > > > > > only
> > > > > > > > > ever have one message in Kafka with a given transaction_id.
> > We
> > > > > > thought
> > > > > > > > > about using a rolling counter that is incremented on each
> > > > > checkpoint
> > > > > > as
> > > > > > > > the
> > > > > > > > > key, and manually triggering compaction after the
> checkpoint
> > is
> > > > > > > complete,
> > > > > > > > > but our checkpoints are asynchronous.  This means that we
> > would
> > > > > have
> > > > > > a
> > > > > > > > set
> > > > > > > > > of messages appended to the log after the checkpoint
> started,
> > > > with
> > > > > > > value
> > > > > > > > of
> > > > > > > > > the previous key + 1, that would also be compacted down to
> a
> > > > single
> > > > > > > > entry.
> > > > > > > > >
> > > > > > > > > Our particular custom policy would delete all messages
> whose
> > > key
> > > > > was
> > > > > > > less
> > > > > > > > > than a given transaction_id that we passed in.  I can
> > imagine a
> > > > > wide
> > > > > > > > > variety of other custom policies that could be used for
> > > retention
> > > > > > based
> > > > > > > > on
> > > > > > > > > the key and value of the message.
> > > > > > > > >
> > > > > > > > > On Wed, Jan 20, 2016 at 1:35 PM, Bill Warshaw <
> > > > > > bill.wars...@appian.com
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello,
> > > > > > > > > >
> > > > > > > > > > I'm working on a team that is starting to use Kafka as a
> > > > > > distributed
> > > > > > > > > > transaction log for a set of in-memory databases which
> can
> > be
> > > > > > > > replicated
> > > > > > > > > > across nodes.  We decided to use Kafka instead of
> > Bookkeeper
> > > > for
> > > > > a
> > > > > > > > > variety
> > > > > > > > > > of reasons, but there are a couple spots where Kafka is
> > not a
> > > > > > perfect
> > > > > > > > > fit.
> > > > > > > > > >
> > > > > > > > > > The biggest issue facing us is deleting old transactions
> > from
> > > > the
> > > > > > log
> > > > > > > > > > after checkpointing the database.  We can't use any of
> the
> > > > > built-in
> > > > > > > > size
> > > > > > > > > or
> > > > > > > > > > time-based deletion mechanisms efficiently, because we
> > could
> > > > get
> > > > > > > > > ourselves
> > > > > > > > > > into a dangerous state where we're deleting transactions
> > that
> > > > > > haven't
> > > > > > > > > been
> > > > > > > > > > checkpointed yet.  The current approach we're looking at
> is
> > > > > > rolling a
> > > > > > > > new
> > > > > > > > > > topic each time we checkpoint, and deleting the old topic
> > > once
> > > > > all
> > > > > > > > > replicas
> > > > > > > > > > have consumed everything in it.
> > > > > > > > > >
> > > > > > > > > > Another idea we came up with is using a pluggable
> > compaction
> > > > > > policy;
> > > > > > > we
> > > > > > > > > > would set the message key as the offset or transaction
> id,
> > > and
> > > > > the
> > > > > > > > policy
> > > > > > > > > > would delete all messages with a key smaller than that
> id.
> > > > > > > > > > I took a stab at implementing the hook in Kafka for
> > pluggable
> > > > > > > > compaction
> > > > > > > > > > policies at
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/compare/trunk...bill-warshaw:pluggable_compaction_policy
> > > > > > > > > > (rough implementation), and it seems fairly
> > straightforward.
> > > > One
> > > > > > > > problem
> > > > > > > > > > that we run into is that the custom policy class can only
> > > > access
> > > > > > > > > > information that is defined in the configuration, and the
> > > > > > > configuration
> > > > > > > > > > doesn't allow custom key-value pairs; if we wanted to
> pass
> > it
> > > > > > > > information
> > > > > > > > > > dynamically, we'd have to use some hack like calling
> > > Zookeeper
> > > > > from
> > > > > > > > > within
> > > > > > > > > > the class.
> > > > > > > > > > To get around this, my best idea is to add the ability to
> > > > specify
> > > > > > > > > > arbitrary key-value pairs in the configuration, that our
> > > client
> > > > > > could
> > > > > > > > use
> > > > > > > > > > to pass information to the custom policy.  Does this set
> > off
> > > > any
> > > > > > > alarm
> > > > > > > > > > bells for you guys?  If so, are there other approaches we
> > > could
> > > > > > take
> > > > > > > > that
> > > > > > > > > > come to mind?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks for your time,
> > > > > > > > > > Bill Warshaw
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > >  <http://appianworld.com>
> > > > > > > > > This message and any attachments are solely for the
> intended
> > > > > > recipient.
> > > > > > > > If
> > > > > > > > > you are not the intended recipient, disclosure, copying,
> use,
> > > or
> > > > > > > > > distribution of the information included in this message is
> > > > > > prohibited
> > > > > > > --
> > > > > > > > > please immediately and permanently delete this message.
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > >  <http://appianworld.com>
> > > > > > > This message and any attachments are solely for the intended
> > > > recipient.
> > > > > > If
> > > > > > > you are not the intended recipient, disclosure, copying, use,
> or
> > > > > > > distribution of the information included in this message is
> > > > prohibited
> > > > > --
> > > > > > > please immediately and permanently delete this message.
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > > > --
> > > > >  <http://appianworld.com>
> > > > > This message and any attachments are solely for the intended
> > recipient.
> > > > If
> > > > > you are not the intended recipient, disclosure, copying, use, or
> > > > > distribution of the information included in this message is
> > prohibited
> > > --
> > > > > please immediately and permanently delete this message.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> > --
> >  <http://appianworld.com>
> > This message and any attachments are solely for the intended recipient.
> If
> > you are not the intended recipient, disclosure, copying, use, or
> > distribution of the information included in this message is prohibited --
> > please immediately and permanently delete this message.
> >
>

-- 
 <http://appianworld.com>
This message and any attachments are solely for the intended recipient. If 
you are not the intended recipient, disclosure, copying, use, or 
distribution of the information included in this message is prohibited -- 
please immediately and permanently delete this message.

Reply via email to