For our particular use case, we would need to.  This proposal is really two
separate pieces:  custom log compaction policy, and the ability to set
arbitrary key-value pairs in a Topic configuration.

I believe that Kafka's current behavior of throwing errors when it
encounters configuration keys that aren't defined is meant to help users
not misconfigure their configuration files.  If that is the sole motivation
for it, I would propose adding a property namespace, and allow users to
configure arbitrary properties behind that particular namespace, while
still enforcing strict parsing for all other properties.

On Wed, Jan 20, 2016 at 9:23 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> So do you need to periodically update the key-value pairs to "advance the
> threshold for each topic"?
>
> Guozhang
>
> On Wed, Jan 20, 2016 at 5:51 PM, Bill Warshaw <bill.wars...@appian.com>
> wrote:
>
> > Compaction would be performed in the same manner as it is currently.
> There
> > is a predicate applied in the "shouldRetainMessage" function in
> LogCleaner;
> > ultimately we just want to be able to swap a custom implementation of
> that
> > particular method in.  Nothing else in the compaction codepath would need
> > to change.
> >
> > For advancing the "threshold transaction_id", ideally we would be able to
> > set arbitrary key-value pairs on the topic configuration.  We have access
> > to the topic configuration during log compaction, so a custom policy
> class
> > would also have access to that config, and could read anything we stored
> in
> > there.
> >
> > On Wed, Jan 20, 2016 at 8:14 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Bill,
> > >
> > > Just to clarify your use case, is your "log compaction" executed
> > manually,
> > > or it is triggered periodically like the current log cleaning by-key
> > does?
> > > If it is the latter case, how will you advance the "threshold
> > > transaction_id" each time when it executes?
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, Jan 20, 2016 at 1:50 PM, Bill Warshaw <bill.wars...@appian.com
> >
> > > wrote:
> > >
> > > > Damian, I appreciate your quick response.
> > > >
> > > > Our transaction_id is incrementing for each transaction, so we will
> > only
> > > > ever have one message in Kafka with a given transaction_id.  We
> thought
> > > > about using a rolling counter that is incremented on each checkpoint
> as
> > > the
> > > > key, and manually triggering compaction after the checkpoint is
> > complete,
> > > > but our checkpoints are asynchronous.  This means that we would have
> a
> > > set
> > > > of messages appended to the log after the checkpoint started, with
> > value
> > > of
> > > > the previous key + 1, that would also be compacted down to a single
> > > entry.
> > > >
> > > > Our particular custom policy would delete all messages whose key was
> > less
> > > > than a given transaction_id that we passed in.  I can imagine a wide
> > > > variety of other custom policies that could be used for retention
> based
> > > on
> > > > the key and value of the message.
> > > >
> > > > On Wed, Jan 20, 2016 at 1:35 PM, Bill Warshaw <
> bill.wars...@appian.com
> > >
> > > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I'm working on a team that is starting to use Kafka as a
> distributed
> > > > > transaction log for a set of in-memory databases which can be
> > > replicated
> > > > > across nodes.  We decided to use Kafka instead of Bookkeeper for a
> > > > variety
> > > > > of reasons, but there are a couple spots where Kafka is not a
> perfect
> > > > fit.
> > > > >
> > > > > The biggest issue facing us is deleting old transactions from the
> log
> > > > > after checkpointing the database.  We can't use any of the built-in
> > > size
> > > > or
> > > > > time-based deletion mechanisms efficiently, because we could get
> > > > ourselves
> > > > > into a dangerous state where we're deleting transactions that
> haven't
> > > > been
> > > > > checkpointed yet.  The current approach we're looking at is
> rolling a
> > > new
> > > > > topic each time we checkpoint, and deleting the old topic once all
> > > > replicas
> > > > > have consumed everything in it.
> > > > >
> > > > > Another idea we came up with is using a pluggable compaction
> policy;
> > we
> > > > > would set the message key as the offset or transaction id, and the
> > > policy
> > > > > would delete all messages with a key smaller than that id.
> > > > > I took a stab at implementing the hook in Kafka for pluggable
> > > compaction
> > > > > policies at
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/compare/trunk...bill-warshaw:pluggable_compaction_policy
> > > > > (rough implementation), and it seems fairly straightforward.  One
> > > problem
> > > > > that we run into is that the custom policy class can only access
> > > > > information that is defined in the configuration, and the
> > configuration
> > > > > doesn't allow custom key-value pairs; if we wanted to pass it
> > > information
> > > > > dynamically, we'd have to use some hack like calling Zookeeper from
> > > > within
> > > > > the class.
> > > > > To get around this, my best idea is to add the ability to specify
> > > > > arbitrary key-value pairs in the configuration, that our client
> could
> > > use
> > > > > to pass information to the custom policy.  Does this set off any
> > alarm
> > > > > bells for you guys?  If so, are there other approaches we could
> take
> > > that
> > > > > come to mind?
> > > > >
> > > > >
> > > > > Thanks for your time,
> > > > > Bill Warshaw
> > > > >
> > > > >
> > > >
> > > > --
> > > >  <http://appianworld.com>
> > > > This message and any attachments are solely for the intended
> recipient.
> > > If
> > > > you are not the intended recipient, disclosure, copying, use, or
> > > > distribution of the information included in this message is
> prohibited
> > --
> > > > please immediately and permanently delete this message.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> > --
> >  <http://appianworld.com>
> > This message and any attachments are solely for the intended recipient.
> If
> > you are not the intended recipient, disclosure, copying, use, or
> > distribution of the information included in this message is prohibited --
> > please immediately and permanently delete this message.
> >
>
>
>
> --
> -- Guozhang
>

-- 
 <http://appianworld.com>
This message and any attachments are solely for the intended recipient. If 
you are not the intended recipient, disclosure, copying, use, or 
distribution of the information included in this message is prohibited -- 
please immediately and permanently delete this message.

Reply via email to