Responding to some of the earlier comments in the thread:

@Jay/@Neha,

I think any one of onCommit/onAppend/onArrival would work for the concrete
use-case that I had outlined. I think onArrival is additionally useful for
custom validation - i.e., reject the message and do not append if it
violates some cluster-specific rule (for e.g., if some header timestamp is
older than xyz). However, the thing with user-supplied validation is we
would have to do with a (new) generic error code in the producer response.
While there is a risk of a broker interceptor having high latency I think
that is acceptable since it is the user's responsibility to ensure low
latency - the producer call-back and onAcknowledgment interceptor are
similar in this regard although those are less risky. Even so, I think
there are clear use-cases for broker interceptors so I feel the risk part
is something that just needs to be documented. @Jay that is a good point
about moving from Message/MessageSet to Records although that may be less
difficult to absorb since it is a broker-side interceptor and so people
don't need to get a ton of applications in their company to switch to use
it.

Re: onEnqueued: monitoring serialization latency can be done via metrics
but this is more useful for recording whether serialization succeeded or
not. onAcknowledgment subsumes this but it also subsumes other possible
errors (such as produce errors). It is more fine-grained than most people
need though (i.e., I don't think we will use it even if it is present.)

Re: checksums: I think it is a good addition to metadata; and for
low-volume or super-critical topics can be used for very strict auditing.

There are a couple of typos/edits for the wiki itself:

   - Under Kafka Producer changes:
   - you have references to KafkaConsumer constructor and
      ConsumerConfig.originals.
      - sendRecord -> sentRecord (may be clearer)
   - Under ProducerInterceptor interface: there is a mention of onEnqueued
   which was rejected
   - Comment for ConsumerRecord.record should probably be: // NEW: record
   size in bytes (*after decompression*)


BTW - Anna, nice work on the KIP!

Joel

On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede <n...@confluent.io> wrote:

> Becket,
>
> Is your concern the presence of CRC in the RecordMetadata or do you want to
> brainstorm how CRC can be used for auditing? I think we shouldn't try to
> think about the various ways that people can do monitoring using
> interceptors and the metadata we provide. The entire point of having
> pluggable interceptors is so that people can employ their own creative
> mechanisms to make use of interceptors.
>
> I do think that it is worth discussing whether or not CRC makes sense as
> record metadata to the user. My take is that the CRC is the best size-bound
> summary of serialized record content available to us which is expensive to
> recompute if the user were to redo it. I'd argue this summary of a record
> qualifies as its metadata. After all, we use the record CRC for a very
> important test of the system durability as it travels through the system.
>
> 1. Isn't the TopicPartition + Offset already uniquely identified a message?
> > It seems better than CRC no matter from summary point of view or auditing
> > point of view.
>
>
> The offset is a system-assigned value of uniqueness to the message. If you
> trusted the system that much, you are not looking to monitor it out-of-band
> :-)
>
>
> > 2. Currently CRC only has 4 bytes. So it will have collision when there
> are
> > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
> > trillion messages per day. So there will be at least a couple of hundreds
> > collision for each CRC code every day, whereas TopicPartition+Offset will
> > not have any collision.
>
>
> The CRC isn't sent over the wire and doesn't add any extra overhead in
> processing, so what is your concern? If you aren't convinced about its
> usefulness, you can always use the default do-nothing interceptor at
> LinkedIn and ignore the CRC.
>
> Without having
> > the entire message bytes, they may not be able to verify its correctness,
> > and the CRC could even be invalid if the broker ever overwritten any
> field
> > or did format conversion.
> >
>
> This doesn't make sense to me. The CRC is used for the most important
> durability check by Kafka - to verify that the message was not garbled over
> the wire. The system can't change it; it has to match on the consumer side
> or we will not return it to the user.
>
> On Fri, Jan 29, 2016 at 3:23 AM, Becket Qin <becket....@gmail.com> wrote:
>
> > Anna,
> >
> > It is still not clear to me why we should expose CRC to end user.
> > Followings are my confusions.
> >
> > 1. Isn't the TopicPartition + Offset already uniquely identified a
> message?
> > It seems better than CRC no matter from summary point of view or auditing
> > point of view.
> >
> > 2. Currently CRC only has 4 bytes. So it will have collision when there
> are
> > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
> > trillion messages per day. So there will be at least a couple of hundreds
> > collision for each CRC code every day, whereas TopicPartition+Offset will
> > not have any collision.
> >
> > 3. CRC is calculated after all the fields have been filled in by
> producer,
> > including timestamp, attributes, etc. It might also get recomputed on
> > broker side. So if users only get CRC from record metadata. Without
> having
> > the entire message bytes, they may not be able to verify its correctness,
> > and the CRC could even be invalid if the broker ever overwritten any
> field
> > or did format conversion.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> > On Thu, Jan 28, 2016 at 5:58 PM, Anna Povzner <a...@confluent.io> wrote:
> >
> > > On a second thought, yes, I think we should expose record size that
> > > represents application bytes. This is Becket's option #1.
> > >
> > > I updated the KIP wiki with new fields in RecordMetadata and
> > > ConsumerRecord.
> > >
> > > I would like to start a voting thread tomorrow if there are no
> objections
> > > or more things to discuss.
> > >
> > > Thanks,
> > > Anna
> > >
> > >
> > >
> > > On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner <a...@confluent.io>
> wrote:
> > >
> > > > Regarding record size as bytes sent over the wire, my concern is that
> > it
> > > > is almost impossible to calculate per-record. We could do as: 1)
> > > compressed
> > > > bytes / number of records in a compressed message, as Todd mentioned;
> > or
> > > 2)
> > > > or same as #1 but take it proportional to uncompressed record size
> vs.
> > > > total uncompressed size of records. All of these calculations give us
> > an
> > > > estimate. So maybe record size as bytes sent over the wire is not a
> > > > per-record metadata, but rather per topic/partition measure that is
> > > better
> > > > to be exposed through metrics?
> > > >
> > > >
> > > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino <tpal...@gmail.com>
> > wrote:
> > > >
> > > >> It may be difficult (or nearly impossible) to get actual compressed
> > > bytes
> > > >> for a message from a compressed batch, but I do think it’s useful
> > > >> information to have available for the very reason noted, bandwidth
> > > >> consumed. Does it make sense to have an interceptor at the batch
> level
> > > >> that
> > > >> can provide this? The other option is to estimate it (such as making
> > an
> > > >> assumption that the messages in a batch are equal in size, which is
> > not
> > > >> necessarily true), which is probably not the right answer.
> > > >>
> > > >> -Todd
> > > >>
> > > >>
> > > >> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner <a...@confluent.io>
> > > wrote:
> > > >>
> > > >> > Hi Becket,
> > > >> >
> > > >> > It will be up to the interceptor to implement their audit or
> > > monitoring
> > > >> > strategy. I would also imagine there is more than one good way to
> do
> > > >> audit.
> > > >> > So, I agree that some of the interceptors may not use CRC, and we
> > will
> > > >> not
> > > >> > require it. The question is now whether intercepting CRCs is
> > needed. I
> > > >> > think they are very useful for monitoring and audit, because CRC
> > > >> provides
> > > >> > an a easy way to get a summary of a message, rather than using
> > message
> > > >> > bytes or key/value objects.
> > > >> >
> > > >> > Regarding record size, I agree that bandwidth example was not a
> good
> > > >> one. I
> > > >> > think it would be hard to get actual bytes sent over the wire
> (your
> > > #2),
> > > >> > since multiple records get compressed together and we would need
> to
> > > >> decide
> > > >> > which bytes to account to which record. So I am inclined to only
> do
> > > your
> > > >> > #1. However, it still makes more sense to me just to return record
> > > size
> > > >> > including the header, since this is the actual record size.
> > > >> >
> > > >> > Thanks,
> > > >> > Anna
> > > >> >
> > > >> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin <
> becket....@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Anna,
> > > >> > >
> > > >> > > Using CRC to do end2end auditing might be very costly because
> you
> > > will
> > > >> > need
> > > >> > > to collect all the CRC from both producer and consumer. And it
> is
> > > >> based
> > > >> > on
> > > >> > > the assumption that broker does not modify the record.
> > > >> > > Can you shed some idea on how end to end auditing will be using
> > the
> > > >> CRC
> > > >> > > before we decide to expose such low level detail to the end
> user?
> > It
> > > >> > would
> > > >> > > also be helpful if you can compare it with something like
> sequence
> > > >> number
> > > >> > > based auditing.
> > > >> > >
> > > >> > > About the record size, one thing worth notice is that the size
> of
> > > >> Record
> > > >> > is
> > > >> > > not the actual bytes sent over the wire if we use compression.
> So
> > > that
> > > >> > does
> > > >> > > not really tell user how much bandwidth they are using.
> > Personally I
> > > >> > think
> > > >> > > two kinds of size may be useful.
> > > >> > > 1. The record size after serialization, i.e. application bytes.
> > (The
> > > >> > > uncompressed record size can be easily derived as well)
> > > >> > > 2. The actual bytes sent over the wire.
> > > >> > > We can get (1) easily, but (2) is difficult to get at Record
> level
> > > >> when
> > > >> > we
> > > >> > > use compression.
> > > >> > >
> > > >> > > Thanks,
> > > >> > >
> > > >> > > Jiangjie (Becket) Qin
> > > >> > >
> > > >> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <
> a...@confluent.io
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > Hi Becket,
> > > >> > > >
> > > >> > > > The use-case for CRC is end-to-end audit, rather than checking
> > > >> whether
> > > >> > a
> > > >> > > > single message is corrupt or not.
> > > >> > > >
> > > >> > > > Regarding record size, I was thinking to extract record size
> > from
> > > >> > Record.
> > > >> > > > That will include header overhead as well. I think total
> record
> > > size
> > > >> > will
> > > >> > > > tell users how much bandwidth their messages take. Since
> header
> > is
> > > >> > > > relatively small and constant, users also will get an idea of
> > > their
> > > >> > > > key/value sizes.
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > > Anna
> > > >> > > >
> > > >> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <
> > becket....@gmail.com
> > > >
> > > >> > > wrote:
> > > >> > > >
> > > >> > > > > I am +1 on #1.2 and #3.
> > > >> > > > >
> > > >> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is
> > > there
> > > >> > any
> > > >> > > > > specific use case? Currently we validate messages by calling
> > > >> > > > ensureValid()
> > > >> > > > > to verify the checksum and throw exception if it does not
> > match.
> > > >> > > > >
> > > >> > > > > Message size would be useful. We can add that to
> > ConsumerRecord.
> > > >> Can
> > > >> > > you
> > > >> > > > > clarify the message size you are referring to? Does it
> include
> > > the
> > > >> > > > message
> > > >> > > > > header overhead or not? From user's point of view, they
> > probably
> > > >> > don't
> > > >> > > > care
> > > >> > > > > about header size.
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > >
> > > >> > > > > Jiangjie (Becket) Qin
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <
> > > n...@confluent.io
> > > >> >
> > > >> > > > wrote:
> > > >> > > > >
> > > >> > > > > > Anna,
> > > >> > > > > >
> > > >> > > > > > Thanks for being diligent.
> > > >> > > > > >
> > > >> > > > > > +1 on #1.2 and sounds good on #3. I recommend adding
> > checksum
> > > >> and
> > > >> > > size
> > > >> > > > > > fields to RecordMetadata and ConsumerRecord instead of
> > > exposing
> > > >> > > > metadata
> > > >> > > > > > piecemeal in the interceptor APIs.
> > > >> > > > > >
> > > >> > > > > > Thanks,
> > > >> > > > > > Neha
> > > >> > > > > >
> > > >> > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <
> > > >> a...@confluent.io>
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi All,
> > > >> > > > > > >
> > > >> > > > > > > The KIP wiki page is now up-to-date with the scope we
> have
> > > >> agreed
> > > >> > > on:
> > > >> > > > > > > Producer and Consumer Interceptors with a minimal set of
> > > >> mutable
> > > >> > > API
> > > >> > > > > that
> > > >> > > > > > > are not dependent on producer and consumer internal
> > > >> > implementation.
> > > >> > > > > > >
> > > >> > > > > > > I have few more API details that I would like to bring
> > > >> attention
> > > >> > to
> > > >> > > > > > or/and
> > > >> > > > > > > discuss:
> > > >> > > > > > >
> > > >> > > > > > > 1. Handling exceptions
> > > >> > > > > > >
> > > >> > > > > > > Exceptions can provide an additional level of control.
> For
> > > >> > example,
> > > >> > > > we
> > > >> > > > > > can
> > > >> > > > > > > filter messages on consumer side or stop messages on
> > > producer
> > > >> if
> > > >> > > they
> > > >> > > > > > don’t
> > > >> > > > > > > have the right field.
> > > >> > > > > > >
> > > >> > > > > > > I see two options:
> > > >> > > > > > > 1.1. For callbacks that can mutate records (onSend and
> > > >> > onConsume),
> > > >> > > > > > > propagate exceptions through the original calls
> > > >> > > (KafkaProducer.send()
> > > >> > > > > and
> > > >> > > > > > > KafkaConsumer.poll() respectively). For other callbacks,
> > > catch
> > > >> > > > > exception,
> > > >> > > > > > > log, and ignore.
> > > >> > > > > > > 1.2. Catch exceptions from all the interceptor callbacks
> > and
> > > >> > > ignore.
> > > >> > > > > > >
> > > >> > > > > > > The issue with 1.1. is that it effectively changes
> > > >> > > > KafkaProducer.send()
> > > >> > > > > > and
> > > >> > > > > > > KafkaConsumer.poll() API, since now they may throw
> > > exceptions
> > > >> > that
> > > >> > > > are
> > > >> > > > > > not
> > > >> > > > > > > documented in KafkaProducer/Consumer API. Another option
> > is
> > > to
> > > >> > > allow
> > > >> > > > to
> > > >> > > > > > > propagate some exceptions, and ignore others.
> > > >> > > > > > >
> > > >> > > > > > > I think our use-cases do not require propagating
> > exceptions.
> > > >> So,
> > > >> > I
> > > >> > > > > > propose
> > > >> > > > > > > option 1.2. Unless someone has suggestion/use-cases for
> > > >> > propagating
> > > >> > > > > > > exceptions. Please let me know.
> > > >> > > > > > >
> > > >> > > > > > > 2. Intercepting record CRC and record size
> > > >> > > > > > >
> > > >> > > > > > > Since we decided not to add any intermediate callbacks
> > (such
> > > >> as
> > > >> > > > > onEnqueue
> > > >> > > > > > > or onReceive) to interceptors, I think it is still
> > valuable
> > > to
> > > >> > > > > intercept
> > > >> > > > > > > record CRC and record size in bytes for monitoring and
> > audit
> > > >> > > > use-cases.
> > > >> > > > > > >
> > > >> > > > > > > I propose to add checksum and size fields to
> > RecordMetadata
> > > >> and
> > > >> > > > > > > ConsumerRecord. Another option would be to add them as
> > > >> parameters
> > > >> > > in
> > > >> > > > > > > onAcknowledgement() and onConsume() callbacks.
> > > >> > > > > > >
> > > >> > > > > > > 3. Callbacks that allow to modify records look as
> follows:
> > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V>
> record);
> > > >> > > > > > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V>
> > > >> records);
> > > >> > > > > > >
> > > >> > > > > > > This means that interceptors can potentially modify
> > > >> > topic/partition
> > > >> > > > in
> > > >> > > > > > > ProducerRecord and topic/partition/offset in
> > > ConsumerRecord. I
> > > >> > > > propose
> > > >> > > > > > that
> > > >> > > > > > > it is up to the interceptor implementation to ensure
> that
> > > >> > > > > > topic/partition,
> > > >> > > > > > > etc is correct. KafkaProducer.send() will use topic,
> > > >> partition,
> > > >> > > key,
> > > >> > > > > and
> > > >> > > > > > > value from ProducerRecord returned from the onSend().
> > > >> Similarly,
> > > >> > > > > > > ConsumerRecords returned from KafkaConsumer.poll() would
> > be
> > > >> the
> > > >> > > ones
> > > >> > > > > > > returned from the interceptor.
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > Please let me know if you have any suggestions or
> > objections
> > > >> to
> > > >> > the
> > > >> > > > > > above.
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Anna
> > > >> > > > > > >
> > > >> > > > > > >
> > > >> > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <
> > > >> a...@confluent.io
> > > >> > >
> > > >> > > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > Hi Mayuresh,
> > > >> > > > > > > >
> > > >> > > > > > > > I see why you would want to check for messages left in
> > the
> > > >> > > > > > > > RecordAccumulator. However, I don't think this will
> > > >> completely
> > > >> > > > solve
> > > >> > > > > > the
> > > >> > > > > > > > problem. Messages could be in-flight somewhere else,
> > like
> > > in
> > > >> > the
> > > >> > > > > > socket,
> > > >> > > > > > > or
> > > >> > > > > > > > there maybe in-flight messages on the consumer side of
> > the
> > > >> > > > > MirrorMaker.
> > > >> > > > > > > So,
> > > >> > > > > > > > if we go the route of checking whether there are any
> > > >> in-flight
> > > >> > > > > messages
> > > >> > > > > > > for
> > > >> > > > > > > > topic deletion use-case, maybe it is better count them
> > > with
> > > >> > > > onSend()
> > > >> > > > > > and
> > > >> > > > > > > > onAcknowledge() -- whether all messages sent were
> > > >> > acknowledged. I
> > > >> > > > > also
> > > >> > > > > > > > think that it would be better to solve this without
> > > >> > interceptors,
> > > >> > > > > such
> > > >> > > > > > as
> > > >> > > > > > > > fix error handling in this scenario. However, I do not
> > > have
> > > >> any
> > > >> > > > good
> > > >> > > > > > > > proposal right now, so these are just general
> thoughts.
> > > >> > > > > > > >
> > > >> > > > > > > > Thanks,
> > > >> > > > > > > > Anna
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > > >> > > > > > > > gharatmayures...@gmail.com> wrote:
> > > >> > > > > > > >
> > > >> > > > > > > >> Calling producer.flush(), flushes all the data. So
> this
> > > is
> > > >> OK.
> > > >> > > But
> > > >> > > > > > when
> > > >> > > > > > > >> you
> > > >> > > > > > > >> are running Mirror maker, I am not sure there is a
> way
> > to
> > > >> > > flush()
> > > >> > > > > from
> > > >> > > > > > > >> outside.
> > > >> > > > > > > >>
> > > >> > > > > > > >>
> > > >> > > > > > > >> Thanks,
> > > >> > > > > > > >>
> > > >> > > > > > > >> Mayuresh
> > > >> > > > > > > >>
> > > >> > > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <
> > > >> > > > becket....@gmail.com>
> > > >> > > > > > > >> wrote:
> > > >> > > > > > > >>
> > > >> > > > > > > >> > Mayuresh,
> > > >> > > > > > > >> >
> > > >> > > > > > > >> > Regarding your use case about mirror maker. Is it
> > good
> > > >> > enough
> > > >> > > as
> > > >> > > > > > long
> > > >> > > > > > > >> as we
> > > >> > > > > > > >> > know there is no message for the topic in the
> > producer
> > > >> > > anymore?
> > > >> > > > If
> > > >> > > > > > > that
> > > >> > > > > > > >> is
> > > >> > > > > > > >> > the case, call producer.flush() is sufficient.
> > > >> > > > > > > >> >
> > > >> > > > > > > >> > Thanks,
> > > >> > > > > > > >> >
> > > >> > > > > > > >> > Jiangjie (Becket) Qin
> > > >> > > > > > > >> >
> > > >> > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> > > >> > > > > > > >> > gharatmayures...@gmail.com
> > > >> > > > > > > >> > > wrote:
> > > >> > > > > > > >> >
> > > >> > > > > > > >> > > Hi Anna,
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > Thanks a lot for summarizing the discussion on
> this
> > > >> kip.
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > It LGTM.
> > > >> > > > > > > >> > > This is really nice :
> > > >> > > > > > > >> > > We decided not to add any callbacks to producer
> and
> > > >> > consumer
> > > >> > > > > > > >> > > interceptors that will depend on internal
> > > >> implementation
> > > >> > as
> > > >> > > > part
> > > >> > > > > > of
> > > >> > > > > > > >> this
> > > >> > > > > > > >> > > KIP.
> > > >> > > > > > > >> > > *However, it is possible to add them later as
> part
> > of
> > > >> > > another
> > > >> > > > > KIP
> > > >> > > > > > if
> > > >> > > > > > > >> > there
> > > >> > > > > > > >> > > are good use-cases.*
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > Do you agree with the use case I explained
> earlier
> > > for
> > > >> > > knowing
> > > >> > > > > the
> > > >> > > > > > > >> number
> > > >> > > > > > > >> > > of records left in the RecordAccumulator for a
> > > >> particular
> > > >> > > > topic.
> > > >> > > > > > It
> > > >> > > > > > > >> might
> > > >> > > > > > > >> > > be orthogonal to this KIP, but will be helpful.
> > What
> > > do
> > > >> > you
> > > >> > > > > think?
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > Thanks,
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > Mayuresh
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <
> > > >> > > > tpal...@gmail.com
> > > >> > > > > >
> > > >> > > > > > > >> wrote:
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > > This looks good. As noted, having one mutable
> > > >> > interceptor
> > > >> > > on
> > > >> > > > > > each
> > > >> > > > > > > >> side
> > > >> > > > > > > >> > > > allows for the use cases we can envision right
> > now,
> > > >> and
> > > >> > I
> > > >> > > > > think
> > > >> > > > > > > >> that’s
> > > >> > > > > > > >> > > > going to provide a great deal of opportunity
> for
> > > >> > > > implementing
> > > >> > > > > > > things
> > > >> > > > > > > >> > like
> > > >> > > > > > > >> > > > audit, especially within a multi-tenant
> > > environment.
> > > >> > > Looking
> > > >> > > > > > > >> forward to
> > > >> > > > > > > >> > > > getting this available in the clients.
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > Thanks!
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > -Todd
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <
> > > >> > > > > > a...@confluent.io>
> > > >> > > > > > > >> > wrote:
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > > Hi All,
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > Here is meeting notes from today’s KIP
> meeting:
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > 1. We agreed to keep the scope of this KIP to
> > be
> > > >> > > producer
> > > >> > > > > and
> > > >> > > > > > > >> > consumer
> > > >> > > > > > > >> > > > > interceptors only. Broker-side interceptor
> will
> > > be
> > > >> > added
> > > >> > > > > later
> > > >> > > > > > > as
> > > >> > > > > > > >> a
> > > >> > > > > > > >> > > > > separate KIP. The reasons were already
> > mentioned
> > > in
> > > >> > this
> > > >> > > > > > thread,
> > > >> > > > > > > >> but
> > > >> > > > > > > >> > > the
> > > >> > > > > > > >> > > > > summary is:
> > > >> > > > > > > >> > > > >  * Broker interceptor is riskier and requires
> > > >> careful
> > > >> > > > > > > >> consideration
> > > >> > > > > > > >> > > about
> > > >> > > > > > > >> > > > > overheads, whether to intercept leaders vs.
> > > >> > > > > leaders/replicas,
> > > >> > > > > > > >> what to
> > > >> > > > > > > >> > > do
> > > >> > > > > > > >> > > > on
> > > >> > > > > > > >> > > > > leader failover and so on.
> > > >> > > > > > > >> > > > >  * Broker interceptors increase monitoring
> > > >> resolution,
> > > >> > > but
> > > >> > > > > not
> > > >> > > > > > > >> > > including
> > > >> > > > > > > >> > > > it
> > > >> > > > > > > >> > > > > in this KIP does not reduce usefulness of
> > > producer
> > > >> and
> > > >> > > > > > consumer
> > > >> > > > > > > >> > > > > interceptors that enable end-to-end
> monitoring
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > 2. We agreed to scope ProducerInterceptor and
> > > >> > > > > > > ConsumerInterceptor
> > > >> > > > > > > >> > > > callbacks
> > > >> > > > > > > >> > > > > to minimal set of mutable API that are not
> > > >> dependent
> > > >> > on
> > > >> > > > > > producer
> > > >> > > > > > > >> and
> > > >> > > > > > > >> > > > > consumer internal implementation.
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > ProducerInterceptor:
> > > >> > > > > > > >> > > > > *ProducerRecord<K, V>
> onSend(ProducerRecord<K,
> > V>
> > > >> > > > record);*
> > > >> > > > > > > >> > > > > *void onAcknowledgement(RecordMetadata
> > metadata,
> > > >> > > Exception
> > > >> > > > > > > >> > exception);*
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > ConsumerInterceptor:
> > > >> > > > > > > >> > > > > *ConsumerRecords<K, V>
> > > >> onConsume(ConsumerRecords<K, V>
> > > >> > > > > > > records);*
> > > >> > > > > > > >> > > > > *void onCommit(Map<TopicPartition,
> > > >> OffsetAndMetadata>
> > > >> > > > > > offsets);*
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > We will allow interceptors to modify
> > > >> ProducerRecord on
> > > >> > > > > > producer
> > > >> > > > > > > >> side,
> > > >> > > > > > > >> > > and
> > > >> > > > > > > >> > > > > modify ConsumerRecords on consumer side. This
> > > will
> > > >> > > support
> > > >> > > > > > > >> end-to-end
> > > >> > > > > > > >> > > > > monitoring and auditing and support the
> ability
> > > to
> > > >> add
> > > >> > > > > > metadata
> > > >> > > > > > > >> for a
> > > >> > > > > > > >> > > > > message. This will support Todd’s Auditing
> and
> > > >> Routing
> > > >> > > > > > > use-cases.
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > We did not find any use-case for modifying
> > > records
> > > >> in
> > > >> > > > > > > onConsume()
> > > >> > > > > > > >> > > > callback,
> > > >> > > > > > > >> > > > > but decided to enable modification of
> consumer
> > > >> records
> > > >> > > for
> > > >> > > > > > > >> symmetry
> > > >> > > > > > > >> > > with
> > > >> > > > > > > >> > > > > onSend().
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > 3. We agreed to ensure compatibility when/if
> we
> > > add
> > > >> > new
> > > >> > > > > > methods
> > > >> > > > > > > to
> > > >> > > > > > > >> > > > > ProducerInterceptor and ConsumerInterceptor
> by
> > > >> using
> > > >> > > > default
> > > >> > > > > > > >> methods
> > > >> > > > > > > >> > > with
> > > >> > > > > > > >> > > > > an empty implementation. Ok to assume Java 8.
> > > >> (This is
> > > >> > > > > > Ismael’s
> > > >> > > > > > > >> > method
> > > >> > > > > > > >> > > > #2).
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > 4. We decided not to add any callbacks to
> > > producer
> > > >> and
> > > >> > > > > > consumer
> > > >> > > > > > > >> > > > > interceptors that will depend on internal
> > > >> > implementation
> > > >> > > > as
> > > >> > > > > > part
> > > >> > > > > > > >> of
> > > >> > > > > > > >> > > this
> > > >> > > > > > > >> > > > > KIP. However, it is possible to add them
> later
> > as
> > > >> part
> > > >> > > of
> > > >> > > > > > > another
> > > >> > > > > > > >> KIP
> > > >> > > > > > > >> > > if
> > > >> > > > > > > >> > > > > there are good use-cases.
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > *Reasoning.* We did not have concrete
> use-cases
> > > >> that
> > > >> > > > > justified
> > > >> > > > > > > >> more
> > > >> > > > > > > >> > > > methods
> > > >> > > > > > > >> > > > > at this point. Some of the use-cases were for
> > > more
> > > >> > > > > fine-grain
> > > >> > > > > > > >> latency
> > > >> > > > > > > >> > > > > collection, which could be done with Kafka
> > > Metrics.
> > > >> > > > Another
> > > >> > > > > > > >> use-case
> > > >> > > > > > > >> > > was
> > > >> > > > > > > >> > > > > encryption. However, there are several design
> > > >> options
> > > >> > > for
> > > >> > > > > > > >> encryption.
> > > >> > > > > > > >> > > One
> > > >> > > > > > > >> > > > > is to do per-record encryption which would
> > > require
> > > >> > > adding
> > > >> > > > > > > >> > > > > ProducerInterceptor.onEnqueued() and
> > > >> > > > > > > >> ConsumerInterceptor.onReceive().
> > > >> > > > > > > >> > > One
> > > >> > > > > > > >> > > > > could argue that in that case encryption
> could
> > be
> > > >> done
> > > >> > > by
> > > >> > > > > > > adding a
> > > >> > > > > > > >> > > custom
> > > >> > > > > > > >> > > > > serializer/deserializer. Another option is to
> > do
> > > >> > > > encryption
> > > >> > > > > > > after
> > > >> > > > > > > >> > > message
> > > >> > > > > > > >> > > > > gets compressed, but there are issues that
> > arise
> > > >> > > regarding
> > > >> > > > > > > broker
> > > >> > > > > > > >> > doing
> > > >> > > > > > > >> > > > > re-compression. We decided that it is better
> to
> > > >> have
> > > >> > > that
> > > >> > > > > > > >> discussion
> > > >> > > > > > > >> > > in a
> > > >> > > > > > > >> > > > > separate KIP and decide that this is
> something
> > we
> > > >> want
> > > >> > > to
> > > >> > > > do
> > > >> > > > > > > with
> > > >> > > > > > > >> > > > > interceptors or by other means.
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > Todd, Mayuresh and others who missed the KIP
> > > >> meeting,
> > > >> > > > please
> > > >> > > > > > let
> > > >> > > > > > > >> me
> > > >> > > > > > > >> > > know
> > > >> > > > > > > >> > > > > your thoughts on the scope we agreed on
> during
> > > the
> > > >> > > > meeting.
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > I will update the KIP proposal with the
> current
> > > >> > decision
> > > >> > > > by
> > > >> > > > > > end
> > > >> > > > > > > of
> > > >> > > > > > > >> > > today.
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > Thanks,
> > > >> > > > > > > >> > > > > Anna
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh
> > > Gharat <
> > > >> > > > > > > >> > > > > gharatmayures...@gmail.com> wrote:
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > > > > Hi,
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > I won't be able to make it to KIP hangout
> due
> > > to
> > > >> > > > conflict.
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > Anna, here is the use case where knowing if
> > > there
> > > >> > are
> > > >> > > > > > messages
> > > >> > > > > > > >> in
> > > >> > > > > > > >> > the
> > > >> > > > > > > >> > > > > > RecordAccumulator left to be sent to the
> > kafka
> > > >> > cluster
> > > >> > > > > for a
> > > >> > > > > > > >> topic
> > > >> > > > > > > >> > is
> > > >> > > > > > > >> > > > > > useful.
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > 1) Consider a pipeline :
> > > >> > > > > > > >> > > > > > A ---> Mirror-maker -----> B
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > 2) We have a topic T in cluster A mirrored
> to
> > > >> > cluster
> > > >> > > B.
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > 3) Now if we delete topic T in A and
> > > immediately
> > > >> > > proceed
> > > >> > > > > to
> > > >> > > > > > > >> delete
> > > >> > > > > > > >> > > the
> > > >> > > > > > > >> > > > > > topic in cluster B, some of the the
> > > Mirror-maker
> > > >> > > > machines
> > > >> > > > > > die
> > > >> > > > > > > >> > because
> > > >> > > > > > > >> > > > > > atleast one of the batches in
> > RecordAccumulator
> > > >> for
> > > >> > > > topic
> > > >> > > > > T
> > > >> > > > > > > >> fail to
> > > >> > > > > > > >> > > be
> > > >> > > > > > > >> > > > > > produced to cluster B. We have seen this
> > > >> happening
> > > >> > in
> > > >> > > > our
> > > >> > > > > > > >> clusters.
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > If we know that there are no more messages
> > left
> > > >> in
> > > >> > the
> > > >> > > > > > > >> > > > RecordAccumulator
> > > >> > > > > > > >> > > > > to
> > > >> > > > > > > >> > > > > > be produced to cluster B, we can safely
> > delete
> > > >> the
> > > >> > > topic
> > > >> > > > > in
> > > >> > > > > > > >> > cluster B
> > > >> > > > > > > >> > > > > > without disturbing the pipeline.
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > Thanks,
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > Mayuresh
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna
> > Povzner
> > > <
> > > >> > > > > > > >> a...@confluent.io>
> > > >> > > > > > > >> > > > > wrote:
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > > Thanks Ismael and Todd for your feedback!
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > I agree about coming up with lean, but
> > useful
> > > >> > > > interfaces
> > > >> > > > > > > that
> > > >> > > > > > > >> > will
> > > >> > > > > > > >> > > be
> > > >> > > > > > > >> > > > > > easy
> > > >> > > > > > > >> > > > > > > to extend later.
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > When we discuss the minimal set of
> producer
> > > and
> > > >> > > > consumer
> > > >> > > > > > > >> > > interceptor
> > > >> > > > > > > >> > > > > API
> > > >> > > > > > > >> > > > > > in
> > > >> > > > > > > >> > > > > > > today’s KIP meeting (discussion item #2
> in
> > my
> > > >> > > previous
> > > >> > > > > > > email),
> > > >> > > > > > > >> > lets
> > > >> > > > > > > >> > > > > > compare
> > > >> > > > > > > >> > > > > > > two options:
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > *1. Minimal set of immutable API for
> > producer
> > > >> and
> > > >> > > > > consumer
> > > >> > > > > > > >> > > > > interceptors*
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > > > >> > > > > > > public void onSend(ProducerRecord<K, V>
> > > >> record);
> > > >> > > > > > > >> > > > > > > public void
> > onAcknowledgement(RecordMetadata
> > > >> > > metadata,
> > > >> > > > > > > >> Exception
> > > >> > > > > > > >> > > > > > > exception);
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > > > >> > > > > > > public void onConsume(ConsumerRecords<K,
> V>
> > > >> > > records);
> > > >> > > > > > > >> > > > > > > public void onCommit(Map<TopicPartition,
> > > >> > > > > > OffsetAndMetadata>
> > > >> > > > > > > >> > > offsets);
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > Use-cases:
> > > >> > > > > > > >> > > > > > > — end-to-end monitoring; custom tracing
> and
> > > >> > logging
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > *2. Minimal set of mutable API for
> producer
> > > and
> > > >> > > > consumer
> > > >> > > > > > > >> > > > interceptors*
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > > > >> > > > > > > ProducerRecord<K, V>
> > onSend(ProducerRecord<K,
> > > >> V>
> > > >> > > > > record);
> > > >> > > > > > > >> > > > > > > void onAcknowledgement(RecordMetadata
> > > metadata,
> > > >> > > > > Exception
> > > >> > > > > > > >> > > exception);
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > > > >> > > > > > > void onConsume(ConsumerRecords<K, V>
> > > records);
> > > >> > > > > > > >> > > > > > > void onCommit(Map<TopicPartition,
> > > >> > OffsetAndMetadata>
> > > >> > > > > > > offsets);
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > Additional use-cases to #1:
> > > >> > > > > > > >> > > > > > > — Ability to add metadata to a message or
> > > fill
> > > >> in
> > > >> > > > > standard
> > > >> > > > > > > >> fields
> > > >> > > > > > > >> > > for
> > > >> > > > > > > >> > > > > > audit
> > > >> > > > > > > >> > > > > > > and routing.
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > Implications
> > > >> > > > > > > >> > > > > > > — Partition assignment will be done based
> > on
> > > >> > > modified
> > > >> > > > > > > >> key/value
> > > >> > > > > > > >> > > > instead
> > > >> > > > > > > >> > > > > > of
> > > >> > > > > > > >> > > > > > > original key/value. If key/value
> > > >> transformation is
> > > >> > > not
> > > >> > > > > > > >> consistent
> > > >> > > > > > > >> > > > (same
> > > >> > > > > > > >> > > > > > key
> > > >> > > > > > > >> > > > > > > and value does not mutate to the same,
> but
> > > >> > modified,
> > > >> > > > > > > >> key/value),
> > > >> > > > > > > >> > > then
> > > >> > > > > > > >> > > > > log
> > > >> > > > > > > >> > > > > > > compaction would not work. However, audit
> > and
> > > >> > > routing
> > > >> > > > > > > >> use-cases
> > > >> > > > > > > >> > > from
> > > >> > > > > > > >> > > > > Todd
> > > >> > > > > > > >> > > > > > > will likely do consistent transformation.
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > *Additional callbacks (discussion item #3
> > in
> > > my
> > > >> > > > previous
> > > >> > > > > > > >> email):*
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > If we want to support encryption, we
> would
> > > >> want to
> > > >> > > be
> > > >> > > > > able
> > > >> > > > > > > to
> > > >> > > > > > > >> > > modify
> > > >> > > > > > > >> > > > > > > serialized key/value, rather than key and
> > > value
> > > >> > > > objects.
> > > >> > > > > > > This
> > > >> > > > > > > >> > will
> > > >> > > > > > > >> > > > add
> > > >> > > > > > > >> > > > > > the
> > > >> > > > > > > >> > > > > > > following API to producer and consumer
> > > >> > interceptors:
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > > > >> > > > > > > SerializedKeyValue
> > onEnqueued(TopicPartition
> > > >> tp,
> > > >> > > > > > > >> > ProducerRecord<K,
> > > >> > > > > > > >> > > V>
> > > >> > > > > > > >> > > > > > > record, SerializedKeyValue
> > > serializedKeyValue);
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > > > >> > > > > > > SerializedKeyValue
> onReceive(TopicPartition
> > > tp,
> > > >> > > > > > > >> > SerializedKeyValue
> > > >> > > > > > > >> > > > > > > serializedKeyValue);
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > I am leaning towards implementing the
> > minimal
> > > >> set
> > > >> > of
> > > >> > > > > > > >> immutable or
> > > >> > > > > > > >> > > > > mutable
> > > >> > > > > > > >> > > > > > > interfaces, making sure that we have a
> > > >> > compatibility
> > > >> > > > > plan
> > > >> > > > > > > that
> > > >> > > > > > > >> > > allows
> > > >> > > > > > > >> > > > > us
> > > >> > > > > > > >> > > > > > to
> > > >> > > > > > > >> > > > > > > add more callbacks in the future (per
> > Ismael
> > > >> > > comment),
> > > >> > > > > and
> > > >> > > > > > > add
> > > >> > > > > > > >> > more
> > > >> > > > > > > >> > > > > APIs
> > > >> > > > > > > >> > > > > > > later. E.g., for encryption use-case,
> there
> > > >> could
> > > >> > be
> > > >> > > > an
> > > >> > > > > > > >> argument
> > > >> > > > > > > >> > in
> > > >> > > > > > > >> > > > > doing
> > > >> > > > > > > >> > > > > > > encryption after message compression vs.
> > > >> > per-record
> > > >> > > > > > > encryption
> > > >> > > > > > > >> > that
> > > >> > > > > > > >> > > > > could
> > > >> > > > > > > >> > > > > > > be done using the above additional API.
> > There
> > > >> is
> > > >> > > also
> > > >> > > > > more
> > > >> > > > > > > >> > > > implications
> > > >> > > > > > > >> > > > > > for
> > > >> > > > > > > >> > > > > > > every API that modifies records:
> modifying
> > > >> > > serialized
> > > >> > > > > > > >> key/value
> > > >> > > > > > > >> > > will
> > > >> > > > > > > >> > > > > > again
> > > >> > > > > > > >> > > > > > > impact partition assignment (we will
> likely
> > > do
> > > >> > that
> > > >> > > > > after
> > > >> > > > > > > >> > partition
> > > >> > > > > > > >> > > > > > > assignment), which may impact log
> > compaction
> > > >> and
> > > >> > > > mirror
> > > >> > > > > > > maker
> > > >> > > > > > > >> > > > > > partitioning.
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > Anna
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd
> > Palino
> > > <
> > > >> > > > > > > >> tpal...@gmail.com>
> > > >> > > > > > > >> > > > > wrote:
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > > > > Finally got a chance to take a look at
> > > this.
> > > >> I
> > > >> > > won’t
> > > >> > > > > be
> > > >> > > > > > > >> able to
> > > >> > > > > > > >> > > > make
> > > >> > > > > > > >> > > > > > the
> > > >> > > > > > > >> > > > > > > > KIP meeting due to a conflict.
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > I’m somewhat disappointed in this
> > > proposal. I
> > > >> > > think
> > > >> > > > > that
> > > >> > > > > > > the
> > > >> > > > > > > >> > > > explicit
> > > >> > > > > > > >> > > > > > > > exclusion of modification of the
> messages
> > > is
> > > >> > > > > > > short-sighted,
> > > >> > > > > > > >> and
> > > >> > > > > > > >> > > not
> > > >> > > > > > > >> > > > > > > > accounting for it now is going to bite
> us
> > > >> later.
> > > >> > > > Jay,
> > > >> > > > > > > aren’t
> > > >> > > > > > > >> > you
> > > >> > > > > > > >> > > > the
> > > >> > > > > > > >> > > > > > one
> > > >> > > > > > > >> > > > > > > > railing against public interfaces and
> how
> > > >> > > difficult
> > > >> > > > > they
> > > >> > > > > > > >> are to
> > > >> > > > > > > >> > > > work
> > > >> > > > > > > >> > > > > > with
> > > >> > > > > > > >> > > > > > > > when you don’t get them right? The
> > “simple”
> > > >> > change
> > > >> > > > to
> > > >> > > > > > one
> > > >> > > > > > > of
> > > >> > > > > > > >> > > these
> > > >> > > > > > > >> > > > > > > > interfaces to make it able to return a
> > > >> record is
> > > >> > > > going
> > > >> > > > > > to
> > > >> > > > > > > >> be a
> > > >> > > > > > > >> > > > > > > significant
> > > >> > > > > > > >> > > > > > > > change and is going to require all
> > clients
> > > to
> > > >> > > > rewrite
> > > >> > > > > > > their
> > > >> > > > > > > >> > > > > > interceptors.
> > > >> > > > > > > >> > > > > > > > If we’re not willing to put the time to
> > > think
> > > >> > > > through
> > > >> > > > > > > >> > > manipulation
> > > >> > > > > > > >> > > > > now,
> > > >> > > > > > > >> > > > > > > > then this KIP should be shelved until
> we
> > > are.
> > > >> > > > > > Implementing
> > > >> > > > > > > >> > > > something
> > > >> > > > > > > >> > > > > > > > halfway is going to be worse than
> taking
> > a
> > > >> > little
> > > >> > > > > > longer.
> > > >> > > > > > > In
> > > >> > > > > > > >> > > > > addition,
> > > >> > > > > > > >> > > > > > I
> > > >> > > > > > > >> > > > > > > > don’t believe that manipulation
> requires
> > > >> > anything
> > > >> > > > more
> > > >> > > > > > > than
> > > >> > > > > > > >> > > > > > interceptors
> > > >> > > > > > > >> > > > > > > to
> > > >> > > > > > > >> > > > > > > > receive the full record, and then to
> > return
> > > >> it.
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > There are 3 use case I can think of
> right
> > > now
> > > >> > > > without
> > > >> > > > > > any
> > > >> > > > > > > >> deep
> > > >> > > > > > > >> > > > > > discussion
> > > >> > > > > > > >> > > > > > > > that can make use of interceptors with
> > > >> > > modification:
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > 1. Auditing. The ability to add
> metadata
> > > to a
> > > >> > > > message
> > > >> > > > > > for
> > > >> > > > > > > >> > > auditing
> > > >> > > > > > > >> > > > is
> > > >> > > > > > > >> > > > > > > > critical. Hostname, service name,
> > > timestamps,
> > > >> > etc.
> > > >> > > > are
> > > >> > > > > > all
> > > >> > > > > > > >> > pieces
> > > >> > > > > > > >> > > > of
> > > >> > > > > > > >> > > > > > data
> > > >> > > > > > > >> > > > > > > > that can be used on the other side of
> the
> > > >> > pipeline
> > > >> > > > to
> > > >> > > > > > > >> > categorize
> > > >> > > > > > > >> > > > > > > messages,
> > > >> > > > > > > >> > > > > > > > determine loss and transport time, and
> > pin
> > > >> down
> > > >> > > > > issues.
> > > >> > > > > > > You
> > > >> > > > > > > >> may
> > > >> > > > > > > >> > > say
> > > >> > > > > > > >> > > > > > that
> > > >> > > > > > > >> > > > > > > > these things can just be part of the
> > > message
> > > >> > > schema,
> > > >> > > > > but
> > > >> > > > > > > >> anyone
> > > >> > > > > > > >> > > who
> > > >> > > > > > > >> > > > > has
> > > >> > > > > > > >> > > > > > > > worked with a multi-user data system
> > > >> (especially
> > > >> > > > those
> > > >> > > > > > who
> > > >> > > > > > > >> have
> > > >> > > > > > > >> > > > been
> > > >> > > > > > > >> > > > > > > > involved with LinkedIn) know how
> > difficult
> > > >> it is
> > > >> > > to
> > > >> > > > > > > maintain
> > > >> > > > > > > >> > > > > consistent
> > > >> > > > > > > >> > > > > > > > message schemas and to get other people
> > to
> > > >> put
> > > >> > in
> > > >> > > > > fields
> > > >> > > > > > > for
> > > >> > > > > > > >> > your
> > > >> > > > > > > >> > > > > use.
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > 2. Encryption. This is probably the
> most
> > > >> obvious
> > > >> > > > case
> > > >> > > > > > for
> > > >> > > > > > > >> > record
> > > >> > > > > > > >> > > > > > > > manipulation on both sides. The ability
> > to
> > > >> tie
> > > >> > in
> > > >> > > > end
> > > >> > > > > to
> > > >> > > > > > > end
> > > >> > > > > > > >> > > > > encryption
> > > >> > > > > > > >> > > > > > > is
> > > >> > > > > > > >> > > > > > > > important for data that requires
> external
> > > >> > > compliance
> > > >> > > > > > (PCI,
> > > >> > > > > > > >> > HIPAA,
> > > >> > > > > > > >> > > > > > etc.).
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > 3. Routing. By being able to add a bit
> of
> > > >> > > > information
> > > >> > > > > > > about
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > > > > source
> > > >> > > > > > > >> > > > > > or
> > > >> > > > > > > >> > > > > > > > destination of a message to the
> metadata,
> > > you
> > > >> > can
> > > >> > > > > easily
> > > >> > > > > > > >> > > construct
> > > >> > > > > > > >> > > > an
> > > >> > > > > > > >> > > > > > > > intelligent mirror maker that can
> prevent
> > > >> loops.
> > > >> > > > This
> > > >> > > > > > has
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > > > > > opportunity
> > > >> > > > > > > >> > > > > > > > to result in significant operational
> > > >> savings, as
> > > >> > > you
> > > >> > > > > can
> > > >> > > > > > > get
> > > >> > > > > > > >> > rid
> > > >> > > > > > > >> > > of
> > > >> > > > > > > >> > > > > the
> > > >> > > > > > > >> > > > > > > > need for tiered clusters in order to
> > > prevent
> > > >> > loops
> > > >> > > > in
> > > >> > > > > > > >> mirroring
> > > >> > > > > > > >> > > > > > messages.
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > All three of these share the feature
> that
> > > >> they
> > > >> > add
> > > >> > > > > > > metadata
> > > >> > > > > > > >> to
> > > >> > > > > > > >> > > > > > messages.
> > > >> > > > > > > >> > > > > > > > With the pushback on having arbitrary
> > > >> metadata
> > > >> > as
> > > >> > > an
> > > >> > > > > > > >> “envelope”
> > > >> > > > > > > >> > > to
> > > >> > > > > > > >> > > > > the
> > > >> > > > > > > >> > > > > > > > message, this is a way to provide it
> and
> > > >> make it
> > > >> > > the
> > > >> > > > > > > >> > > responsibility
> > > >> > > > > > > >> > > > > of
> > > >> > > > > > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > > client, and not the Kafka broker and
> > system
> > > >> > > itself.
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > -Todd
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael
> > > Juma
> > > >> <
> > > >> > > > > > > >> > ism...@juma.me.uk>
> > > >> > > > > > > >> > > > > > wrote:
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > > Hi Anna and Neha,
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > I think it makes a lot of sense to
> try
> > > and
> > > >> > keep
> > > >> > > > the
> > > >> > > > > > > >> interface
> > > >> > > > > > > >> > > > lean
> > > >> > > > > > > >> > > > > > and
> > > >> > > > > > > >> > > > > > > to
> > > >> > > > > > > >> > > > > > > > > add more methods later when/if there
> > is a
> > > >> > need.
> > > >> > > > What
> > > >> > > > > > is
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > > > current
> > > >> > > > > > > >> > > > > > > > > thinking with regards to
> compatibility
> > > >> when/if
> > > >> > > we
> > > >> > > > > add
> > > >> > > > > > > new
> > > >> > > > > > > >> > > > methods?
> > > >> > > > > > > >> > > > > A
> > > >> > > > > > > >> > > > > > > few
> > > >> > > > > > > >> > > > > > > > > options come to mind:
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > 1. Change the interface to an
> abstract
> > > >> class
> > > >> > > with
> > > >> > > > > > empty
> > > >> > > > > > > >> > > > > > implementations
> > > >> > > > > > > >> > > > > > > > for
> > > >> > > > > > > >> > > > > > > > > all the methods. This means that the
> > path
> > > >> to
> > > >> > > > adding
> > > >> > > > > > new
> > > >> > > > > > > >> > methods
> > > >> > > > > > > >> > > > is
> > > >> > > > > > > >> > > > > > > clear.
> > > >> > > > > > > >> > > > > > > > > 2. Hope we have moved to Java 8 by
> the
> > > >> time we
> > > >> > > > need
> > > >> > > > > to
> > > >> > > > > > > add
> > > >> > > > > > > >> > new
> > > >> > > > > > > >> > > > > > methods
> > > >> > > > > > > >> > > > > > > > and
> > > >> > > > > > > >> > > > > > > > > use default methods with an empty
> > > >> > implementation
> > > >> > > > for
> > > >> > > > > > any
> > > >> > > > > > > >> new
> > > >> > > > > > > >> > > > method
> > > >> > > > > > > >> > > > > > > (and
> > > >> > > > > > > >> > > > > > > > > potentially make existing methods
> > default
> > > >> > > methods
> > > >> > > > > too
> > > >> > > > > > at
> > > >> > > > > > > >> that
> > > >> > > > > > > >> > > > point
> > > >> > > > > > > >> > > > > > for
> > > >> > > > > > > >> > > > > > > > > consistency)
> > > >> > > > > > > >> > > > > > > > > 3. Introduce a new interface that
> > > inherits
> > > >> > from
> > > >> > > > the
> > > >> > > > > > > >> existing
> > > >> > > > > > > >> > > > > > > Interceptor
> > > >> > > > > > > >> > > > > > > > > interface when we need to add new
> > > methods.
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > Option 1 is the easiest and it also
> > means
> > > >> that
> > > >> > > > > > > interceptor
> > > >> > > > > > > >> > > users
> > > >> > > > > > > >> > > > > only
> > > >> > > > > > > >> > > > > > > > need
> > > >> > > > > > > >> > > > > > > > > to override the methods that they are
> > > >> > interested
> > > >> > > > > (more
> > > >> > > > > > > >> useful
> > > >> > > > > > > >> > > if
> > > >> > > > > > > >> > > > > the
> > > >> > > > > > > >> > > > > > > > number
> > > >> > > > > > > >> > > > > > > > > of methods grows). The downside is
> that
> > > >> > > > interceptor
> > > >> > > > > > > >> > > > implementations
> > > >> > > > > > > >> > > > > > > > cannot
> > > >> > > > > > > >> > > > > > > > > inherit from another class (a
> > > >> straightforward
> > > >> > > > > > workaround
> > > >> > > > > > > >> is
> > > >> > > > > > > >> > to
> > > >> > > > > > > >> > > > make
> > > >> > > > > > > >> > > > > > the
> > > >> > > > > > > >> > > > > > > > > interceptor a forwarder that calls
> > > another
> > > >> > > class).
> > > >> > > > > > Also,
> > > >> > > > > > > >> our
> > > >> > > > > > > >> > > > > existing
> > > >> > > > > > > >> > > > > > > > > callbacks are interfaces, so seems a
> > bit
> > > >> > > > > inconsistent.
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > Option 2 may be the most appealing
> one
> > as
> > > >> both
> > > >> > > > users
> > > >> > > > > > and
> > > >> > > > > > > >> > > > ourselves
> > > >> > > > > > > >> > > > > > > retain
> > > >> > > > > > > >> > > > > > > > > flexibility. The main downside is
> that
> > it
> > > >> > relies
> > > >> > > > on
> > > >> > > > > us
> > > >> > > > > > > >> moving
> > > >> > > > > > > >> > > to
> > > >> > > > > > > >> > > > > Java
> > > >> > > > > > > >> > > > > > > 8,
> > > >> > > > > > > >> > > > > > > > > which may be more than a year away
> > > >> potentially
> > > >> > > (if
> > > >> > > > > we
> > > >> > > > > > > >> support
> > > >> > > > > > > >> > > the
> > > >> > > > > > > >> > > > > > last
> > > >> > > > > > > >> > > > > > > 2
> > > >> > > > > > > >> > > > > > > > > Java releases).
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > Thoughts?
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > Ismael
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha
> > > >> > Narkhede <
> > > >> > > > > > > >> > > > n...@confluent.io>
> > > >> > > > > > > >> > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > Anna,
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > I'm also in favor of including just
> > the
> > > >> APIs
> > > >> > > for
> > > >> > > > > > which
> > > >> > > > > > > >> we
> > > >> > > > > > > >> > > have
> > > >> > > > > > > >> > > > a
> > > >> > > > > > > >> > > > > > > clear
> > > >> > > > > > > >> > > > > > > > > use
> > > >> > > > > > > >> > > > > > > > > > case. If more use cases for finer
> > > >> monitoring
> > > >> > > > show
> > > >> > > > > up
> > > >> > > > > > > in
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > > > > future,
> > > >> > > > > > > >> > > > > > > we
> > > >> > > > > > > >> > > > > > > > > can
> > > >> > > > > > > >> > > > > > > > > > always update the interface. Would
> > you
> > > >> > please
> > > >> > > > > > > highlight
> > > >> > > > > > > >> in
> > > >> > > > > > > >> > > the
> > > >> > > > > > > >> > > > > KIP
> > > >> > > > > > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > > > APIs
> > > >> > > > > > > >> > > > > > > > > > that you think we have an immediate
> > use
> > > >> for?
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > Joel,
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > Broker-side monitoring makes a lot
> of
> > > >> sense
> > > >> > in
> > > >> > > > the
> > > >> > > > > > > long
> > > >> > > > > > > >> > term
> > > >> > > > > > > >> > > > > > though I
> > > >> > > > > > > >> > > > > > > > > don't
> > > >> > > > > > > >> > > > > > > > > > think it is a requirement for
> > > end-to-end
> > > >> > > > > monitoring.
> > > >> > > > > > > >> With
> > > >> > > > > > > >> > the
> > > >> > > > > > > >> > > > > > > producer
> > > >> > > > > > > >> > > > > > > > > and
> > > >> > > > > > > >> > > > > > > > > > consumer interceptors, you have the
> > > >> ability
> > > >> > to
> > > >> > > > get
> > > >> > > > > > > full
> > > >> > > > > > > >> > > > > > > > > > publish-to-subscribe end-to-end
> > > >> monitoring.
> > > >> > > The
> > > >> > > > > > broker
> > > >> > > > > > > >> > > > > interceptor
> > > >> > > > > > > >> > > > > > > > > > certainly improves the resolution
> of
> > > >> > > monitoring
> > > >> > > > > but
> > > >> > > > > > it
> > > >> > > > > > > >> is
> > > >> > > > > > > >> > > also
> > > >> > > > > > > >> > > > a
> > > >> > > > > > > >> > > > > > > > riskier
> > > >> > > > > > > >> > > > > > > > > > change. I prefer an incremental
> > > approach
> > > >> > over
> > > >> > > a
> > > >> > > > > > > big-bang
> > > >> > > > > > > >> > and
> > > >> > > > > > > >> > > > > > > recommend
> > > >> > > > > > > >> > > > > > > > > > taking baby-steps. Let's first make
> > > sure
> > > >> the
> > > >> > > > > > > >> > > producer/consumer
> > > >> > > > > > > >> > > > > > > > > interceptors
> > > >> > > > > > > >> > > > > > > > > > are successful. And then come back
> > and
> > > >> add
> > > >> > the
> > > >> > > > > > broker
> > > >> > > > > > > >> > > > interceptor
> > > >> > > > > > > >> > > > > > > > > > carefully.
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > Having said that, it would be great
> > to
> > > >> > > > understand
> > > >> > > > > > your
> > > >> > > > > > > >> > > proposal
> > > >> > > > > > > >> > > > > for
> > > >> > > > > > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > broker interceptor independently.
> We
> > > can
> > > >> > > either
> > > >> > > > > add
> > > >> > > > > > an
> > > >> > > > > > > >> > > > > interceptor
> > > >> > > > > > > >> > > > > > > > > > on-append or on-commit. If people
> > want
> > > to
> > > >> > use
> > > >> > > > this
> > > >> > > > > > for
> > > >> > > > > > > >> > > > > monitoring,
> > > >> > > > > > > >> > > > > > > then
> > > >> > > > > > > >> > > > > > > > > > possibly on-commit might be more
> > > useful?
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > Neha
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM,
> Jay
> > > >> Kreps <
> > > >> > > > > > > >> > j...@confluent.io
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > Hey Joel,
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > What is the interface you are
> > > thinking
> > > >> of?
> > > >> > > > > > Something
> > > >> > > > > > > >> like
> > > >> > > > > > > >> > > > this:
> > > >> > > > > > > >> > > > > > > > > > >     onAppend(String topic, int
> > > >> partition,
> > > >> > > > > Records
> > > >> > > > > > > >> > records,
> > > >> > > > > > > >> > > > long
> > > >> > > > > > > >> > > > > > > time)
> > > >> > > > > > > >> > > > > > > > > > > ?
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > One challenge right now is that
> we
> > > are
> > > >> > still
> > > >> > > > > using
> > > >> > > > > > > the
> > > >> > > > > > > >> > old
> > > >> > > > > > > >> > > > > > > > > > > Message/MessageSet classes on the
> > > >> broker
> > > >> > > which
> > > >> > > > > I'm
> > > >> > > > > > > not
> > > >> > > > > > > >> > sure
> > > >> > > > > > > >> > > > if
> > > >> > > > > > > >> > > > > > we'd
> > > >> > > > > > > >> > > > > > > > > want
> > > >> > > > > > > >> > > > > > > > > > to
> > > >> > > > > > > >> > > > > > > > > > > support over the long haul but it
> > > >> might be
> > > >> > > > okay
> > > >> > > > > > just
> > > >> > > > > > > >> to
> > > >> > > > > > > >> > > > create
> > > >> > > > > > > >> > > > > > the
> > > >> > > > > > > >> > > > > > > > > > records
> > > >> > > > > > > >> > > > > > > > > > > instance for this interface.
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > -Jay
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM,
> > > Joel
> > > >> > > Koshy <
> > > >> > > > > > > >> > > > > > jjkosh...@gmail.com>
> > > >> > > > > > > >> > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > I'm definitely in favor of
> having
> > > >> such
> > > >> > > hooks
> > > >> > > > > in
> > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > produce/consume
> > > >> > > > > > > >> > > > > > > > > > > > life-cycle. Not sure if people
> > > >> remember
> > > >> > > this
> > > >> > > > > but
> > > >> > > > > > > in
> > > >> > > > > > > >> > Kafka
> > > >> > > > > > > >> > > > 0.7
> > > >> > > > > > > >> > > > > > > this
> > > >> > > > > > > >> > > > > > > > > was
> > > >> > > > > > > >> > > > > > > > > > > > pretty much how it was:
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> >
> > > >> > > > > > > >>
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > >> > > > > > > >> > > > > > > > > > > > i.e., we had something similar
> to
> > > the
> > > >> > > > > > interceptor
> > > >> > > > > > > >> > > proposal
> > > >> > > > > > > >> > > > > for
> > > >> > > > > > > >> > > > > > > > > various
> > > >> > > > > > > >> > > > > > > > > > > > stages of the producer request.
> > The
> > > >> > > producer
> > > >> > > > > > > >> provided
> > > >> > > > > > > >> > > > > > call-backs
> > > >> > > > > > > >> > > > > > > > for
> > > >> > > > > > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue,
> > > >> > > afterDequeuing,
> > > >> > > > > > > >> > > beforeSending,
> > > >> > > > > > > >> > > > > > etc.
> > > >> > > > > > > >> > > > > > > So
> > > >> > > > > > > >> > > > > > > > > at
> > > >> > > > > > > >> > > > > > > > > > > > LinkedIn we in fact did
> auditing
> > > >> within
> > > >> > > > these
> > > >> > > > > > > >> > call-backs
> > > >> > > > > > > >> > > > (and
> > > >> > > > > > > >> > > > > > not
> > > >> > > > > > > >> > > > > > > > > > > > explicitly in the wrapper).
> Over
> > > time
> > > >> > and
> > > >> > > > with
> > > >> > > > > > 0.8
> > > >> > > > > > > >> we
> > > >> > > > > > > >> > > moved
> > > >> > > > > > > >> > > > > > that
> > > >> > > > > > > >> > > > > > > > out
> > > >> > > > > > > >> > > > > > > > > to
> > > >> > > > > > > >> > > > > > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > > > wrapper libraries.
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > On a side-note while audit and
> > > other
> > > >> > > > > monitoring
> > > >> > > > > > > can
> > > >> > > > > > > >> be
> > > >> > > > > > > >> > > done
> > > >> > > > > > > >> > > > > > > > > internally
> > > >> > > > > > > >> > > > > > > > > > > in a
> > > >> > > > > > > >> > > > > > > > > > > > convenient way I think it
> should
> > be
> > > >> > > > clarified
> > > >> > > > > > that
> > > >> > > > > > > >> > > having a
> > > >> > > > > > > >> > > > > > > wrapper
> > > >> > > > > > > >> > > > > > > > > is
> > > >> > > > > > > >> > > > > > > > > > in
> > > >> > > > > > > >> > > > > > > > > > > > general not a bad idea and I
> > would
> > > >> even
> > > >> > > > > consider
> > > >> > > > > > > it
> > > >> > > > > > > >> to
> > > >> > > > > > > >> > > be a
> > > >> > > > > > > >> > > > > > > > > > > best-practice.
> > > >> > > > > > > >> > > > > > > > > > > > Even with 0.7 we still had a
> > > wrapper
> > > >> > > library
> > > >> > > > > and
> > > >> > > > > > > >> that
> > > >> > > > > > > >> > API
> > > >> > > > > > > >> > > > has
> > > >> > > > > > > >> > > > > > > > largely
> > > >> > > > > > > >> > > > > > > > > > > > stayed the same and has helped
> > > >> protect
> > > >> > > > against
> > > >> > > > > > > >> > (sometimes
> > > >> > > > > > > >> > > > > > > backwards
> > > >> > > > > > > >> > > > > > > > > > > > incompatible) changes in open
> > > source.
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > While we are on this topic I
> have
> > > one
> > > >> > > > comment
> > > >> > > > > > and
> > > >> > > > > > > >> Anna,
> > > >> > > > > > > >> > > you
> > > >> > > > > > > >> > > > > may
> > > >> > > > > > > >> > > > > > > > have
> > > >> > > > > > > >> > > > > > > > > > > > already considered this but I
> > don't
> > > >> see
> > > >> > > > > mention
> > > >> > > > > > of
> > > >> > > > > > > >> it
> > > >> > > > > > > >> > in
> > > >> > > > > > > >> > > > the
> > > >> > > > > > > >> > > > > > KIP:
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > Add a custom message
> > > >> > interceptor/validator
> > > >> > > > on
> > > >> > > > > > the
> > > >> > > > > > > >> > broker
> > > >> > > > > > > >> > > on
> > > >> > > > > > > >> > > > > > > message
> > > >> > > > > > > >> > > > > > > > > > > > arrival.
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > We decompress and do basic
> > > >> validation of
> > > >> > > > > > messages
> > > >> > > > > > > on
> > > >> > > > > > > >> > > > > arrival. I
> > > >> > > > > > > >> > > > > > > > think
> > > >> > > > > > > >> > > > > > > > > > > there
> > > >> > > > > > > >> > > > > > > > > > > > is value in supporting custom
> > > >> validation
> > > >> > > and
> > > >> > > > > > > expand
> > > >> > > > > > > >> it
> > > >> > > > > > > >> > to
> > > >> > > > > > > >> > > > > > support
> > > >> > > > > > > >> > > > > > > > > > custom
> > > >> > > > > > > >> > > > > > > > > > > > on-arrival processing. Here is
> a
> > > >> > specific
> > > >> > > > > > > use-case I
> > > >> > > > > > > >> > have
> > > >> > > > > > > >> > > > in
> > > >> > > > > > > >> > > > > > > mind.
> > > >> > > > > > > >> > > > > > > > > The
> > > >> > > > > > > >> > > > > > > > > > > blog
> > > >> > > > > > > >> > > > > > > > > > > > that James referenced describes
> > our
> > > >> > > auditing
> > > >> > > > > > > >> > > > infrastructure.
> > > >> > > > > > > >> > > > > In
> > > >> > > > > > > >> > > > > > > > order
> > > >> > > > > > > >> > > > > > > > > > to
> > > >> > > > > > > >> > > > > > > > > > > > audit the Kafka cluster itself
> we
> > > >> need
> > > >> > to
> > > >> > > > run
> > > >> > > > > a
> > > >> > > > > > > >> > "console
> > > >> > > > > > > >> > > > > > auditor"
> > > >> > > > > > > >> > > > > > > > > > service
> > > >> > > > > > > >> > > > > > > > > > > > that consumes everything and
> > spits
> > > >> out
> > > >> > > audit
> > > >> > > > > > > events
> > > >> > > > > > > >> > back
> > > >> > > > > > > >> > > to
> > > >> > > > > > > >> > > > > the
> > > >> > > > > > > >> > > > > > > > > > cluster.
> > > >> > > > > > > >> > > > > > > > > > > I
> > > >> > > > > > > >> > > > > > > > > > > > would prefer not having to run
> > this
> > > >> > > service
> > > >> > > > > > > because:
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > >    - Well, it is one more
> service
> > > >> that
> > > >> > we
> > > >> > > > have
> > > >> > > > > > to
> > > >> > > > > > > >> run
> > > >> > > > > > > >> > and
> > > >> > > > > > > >> > > > > > monitor
> > > >> > > > > > > >> > > > > > > > > > > >    - Consuming everything takes
> > up
> > > >> > > bandwidth
> > > >> > > > > > which
> > > >> > > > > > > >> can
> > > >> > > > > > > >> > be
> > > >> > > > > > > >> > > > > > avoided
> > > >> > > > > > > >> > > > > > > > > > > >    - The console auditor
> consumer
> > > >> itself
> > > >> > > can
> > > >> > > > > lag
> > > >> > > > > > > and
> > > >> > > > > > > >> > > cause
> > > >> > > > > > > >> > > > > > > > temporary
> > > >> > > > > > > >> > > > > > > > > > > audit
> > > >> > > > > > > >> > > > > > > > > > > >    discrepancies
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > One way we can mitigate this is
> > by
> > > >> > having
> > > >> > > > > > > >> mirror-makers
> > > >> > > > > > > >> > > in
> > > >> > > > > > > >> > > > > > > between
> > > >> > > > > > > >> > > > > > > > > > > clusters
> > > >> > > > > > > >> > > > > > > > > > > > emit audit events. The problem
> is
> > > >> that
> > > >> > the
> > > >> > > > > very
> > > >> > > > > > > last
> > > >> > > > > > > >> > > > cluster
> > > >> > > > > > > >> > > > > in
> > > >> > > > > > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > > > pipeline will not have any
> audit
> > > >> which
> > > >> > is
> > > >> > > > why
> > > >> > > > > we
> > > >> > > > > > > >> need
> > > >> > > > > > > >> > to
> > > >> > > > > > > >> > > > have
> > > >> > > > > > > >> > > > > > > > > something
> > > >> > > > > > > >> > > > > > > > > > > to
> > > >> > > > > > > >> > > > > > > > > > > > audit the cluster.
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > If we had a custom message
> > > validator
> > > >> > then
> > > >> > > > the
> > > >> > > > > > > audit
> > > >> > > > > > > >> can
> > > >> > > > > > > >> > > be
> > > >> > > > > > > >> > > > > done
> > > >> > > > > > > >> > > > > > > > > > > on-arrival
> > > >> > > > > > > >> > > > > > > > > > > > and we won't need a console
> > > auditor.
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > One potential issue in this
> > > approach
> > > >> and
> > > >> > > any
> > > >> > > > > > > >> elaborate
> > > >> > > > > > > >> > > > > > on-arrival
> > > >> > > > > > > >> > > > > > > > > > > > processing for that matter is
> > that
> > > >> you
> > > >> > may
> > > >> > > > > need
> > > >> > > > > > to
> > > >> > > > > > > >> > > > > deserialize
> > > >> > > > > > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > > message
> > > >> > > > > > > >> > > > > > > > > > > > as well which can drive up
> > produce
> > > >> > request
> > > >> > > > > > > handling
> > > >> > > > > > > >> > > times.
> > > >> > > > > > > >> > > > > > > However
> > > >> > > > > > > >> > > > > > > > > I'm
> > > >> > > > > > > >> > > > > > > > > > > not
> > > >> > > > > > > >> > > > > > > > > > > > terribly concerned about that
> > > >> especially
> > > >> > > if
> > > >> > > > > the
> > > >> > > > > > > >> audit
> > > >> > > > > > > >> > > > header
> > > >> > > > > > > >> > > > > > can
> > > >> > > > > > > >> > > > > > > be
> > > >> > > > > > > >> > > > > > > > > > > > separated out easily or even
> > > >> > deserialized
> > > >> > > > > > > partially
> > > >> > > > > > > >> as
> > > >> > > > > > > >> > > this
> > > >> > > > > > > >> > > > > > Avro
> > > >> > > > > > > >> > > > > > > > > thread
> > > >> > > > > > > >> > > > > > > > > > > > touches on
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> >
> > > >> > > > > > > >>
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > Joel
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02
> PM,
> > > >> > Mayuresh
> > > >> > > > > > Gharat
> > > >> > > > > > > <
> > > >> > > > > > > >> > > > > > > > > > > > gharatmayures...@gmail.com>
> > wrote:
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea.
> > > >> > > > > > > >> > > > > > > > > > > > > Was just thinking if we can
> add
> > > >> > > onDequed()
> > > >> > > > > to
> > > >> > > > > > > the
> > > >> > > > > > > >> > > > > > > > > ProducerIterceptor
> > > >> > > > > > > >> > > > > > > > > > > > > interface. Since we have the
> > > >> > > onEnqueued(),
> > > >> > > > > it
> > > >> > > > > > > will
> > > >> > > > > > > >> > help
> > > >> > > > > > > >> > > > the
> > > >> > > > > > > >> > > > > > > > client
> > > >> > > > > > > >> > > > > > > > > or
> > > >> > > > > > > >> > > > > > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > > > > tools to know how much time
> the
> > > >> > message
> > > >> > > > > spent
> > > >> > > > > > in
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > > > > > > > > > RecordAccumulator.
> > > >> > > > > > > >> > > > > > > > > > > > > Also an API to check if there
> > are
> > > >> any
> > > >> > > > > messages
> > > >> > > > > > > >> left
> > > >> > > > > > > >> > > for a
> > > >> > > > > > > >> > > > > > > > > particular
> > > >> > > > > > > >> > > > > > > > > > > > topic
> > > >> > > > > > > >> > > > > > > > > > > > > in the RecordAccumulator
> would
> > > >> help.
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > Mayuresh
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29
> > AM,
> > > >> Todd
> > > >> > > > > Palino
> > > >> > > > > > <
> > > >> > > > > > > >> > > > > > > tpal...@gmail.com
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve been
> talking
> > > >> about
> > > >> > > this
> > > >> > > > > > for 2
> > > >> > > > > > > >> > years,
> > > >> > > > > > > >> > > > and
> > > >> > > > > > > >> > > > > > I’m
> > > >> > > > > > > >> > > > > > > > > glad
> > > >> > > > > > > >> > > > > > > > > > > > > someone
> > > >> > > > > > > >> > > > > > > > > > > > > > is finally picking it up.
> > Will
> > > >> take
> > > >> > a
> > > >> > > > look
> > > >> > > > > > at
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > KIP
> > > >> > > > > > > >> > > > at
> > > >> > > > > > > >> > > > > > some
> > > >> > > > > > > >> > > > > > > > > point
> > > >> > > > > > > >> > > > > > > > > > > > > > shortly.
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > -Todd
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at
> 11:24
> > > AM,
> > > >> > Jay
> > > >> > > > > Kreps
> > > >> > > > > > <
> > > >> > > > > > > >> > > > > > > j...@confluent.io>
> > > >> > > > > > > >> > > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > Hey Becket,
> > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > Yeah this is really
> similar
> > > to
> > > >> the
> > > >> > > > > > callback.
> > > >> > > > > > > >> The
> > > >> > > > > > > >> > > > > > difference
> > > >> > > > > > > >> > > > > > > > is
> > > >> > > > > > > >> > > > > > > > > > > really
> > > >> > > > > > > >> > > > > > > > > > > > > in
> > > >> > > > > > > >> > > > > > > > > > > > > > > who sets the behavior.
> The
> > > >> idea of
> > > >> > > the
> > > >> > > > > > > >> > interceptor
> > > >> > > > > > > >> > > is
> > > >> > > > > > > >> > > > > > that
> > > >> > > > > > > >> > > > > > > it
> > > >> > > > > > > >> > > > > > > > > > > doesn't
> > > >> > > > > > > >> > > > > > > > > > > > > > > require any code change
> in
> > > >> apps so
> > > >> > > you
> > > >> > > > > can
> > > >> > > > > > > >> > globally
> > > >> > > > > > > >> > > > add
> > > >> > > > > > > >> > > > > > > > > behavior
> > > >> > > > > > > >> > > > > > > > > > to
> > > >> > > > > > > >> > > > > > > > > > > > > your
> > > >> > > > > > > >> > > > > > > > > > > > > > > Kafka usage without
> > changing
> > > >> app
> > > >> > > code.
> > > >> > > > > > > Whereas
> > > >> > > > > > > >> > the
> > > >> > > > > > > >> > > > > > callback
> > > >> > > > > > > >> > > > > > > > is
> > > >> > > > > > > >> > > > > > > > > > > added
> > > >> > > > > > > >> > > > > > > > > > > > by
> > > >> > > > > > > >> > > > > > > > > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > > > > > > app. The idea is to kind
> of
> > > >> > obviate
> > > >> > > > the
> > > >> > > > > > need
> > > >> > > > > > > >> for
> > > >> > > > > > > >> > > the
> > > >> > > > > > > >> > > > > > > wrapper
> > > >> > > > > > > >> > > > > > > > > code
> > > >> > > > > > > >> > > > > > > > > > > > that
> > > >> > > > > > > >> > > > > > > > > > > > > > e.g.
> > > >> > > > > > > >> > > > > > > > > > > > > > > LinkedIn maintains to
> hold
> > > this
> > > >> > kind
> > > >> > > > of
> > > >> > > > > > > stuff.
> > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > -Jay
> > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at
> > 4:21
> > > >> PM,
> > > >> > > > Becket
> > > >> > > > > > Qin
> > > >> > > > > > > <
> > > >> > > > > > > >> > > > > > > > > > becket....@gmail.com>
> > > >> > > > > > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > This could be a useful
> > > >> feature.
> > > >> > > And
> > > >> > > > I
> > > >> > > > > > > think
> > > >> > > > > > > >> > there
> > > >> > > > > > > >> > > > are
> > > >> > > > > > > >> > > > > > > some
> > > >> > > > > > > >> > > > > > > > > use
> > > >> > > > > > > >> > > > > > > > > > > > cases
> > > >> > > > > > > >> > > > > > > > > > > > > to
> > > >> > > > > > > >> > > > > > > > > > > > > > > > mutate the data like
> > > rejected
> > > >> > > > > > alternative
> > > >> > > > > > > >> one
> > > >> > > > > > > >> > > > > > mentioned.
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > I am wondering if there
> > is
> > > >> > > > functional
> > > >> > > > > > > >> > overlapping
> > > >> > > > > > > >> > > > > > between
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > ProducerInterceptor.onAcknowledgement()
> > > >> > > > > > > and
> > > >> > > > > > > >> the
> > > >> > > > > > > >> > > > > > producer
> > > >> > > > > > > >> > > > > > > > > > > callback?
> > > >> > > > > > > >> > > > > > > > > > > > I
> > > >> > > > > > > >> > > > > > > > > > > > > > can
> > > >> > > > > > > >> > > > > > > > > > > > > > > > see that the Callback
> > could
> > > >> be a
> > > >> > > per
> > > >> > > > > > > record
> > > >> > > > > > > >> > > setting
> > > >> > > > > > > >> > > > > > while
> > > >> > > > > > > >> > > > > > > > > > > > > > > > onAcknowledgement() is
> a
> > > >> > producer
> > > >> > > > > level
> > > >> > > > > > > >> > setting.
> > > >> > > > > > > >> > > > > Other
> > > >> > > > > > > >> > > > > > > than
> > > >> > > > > > > >> > > > > > > > > > that,
> > > >> > > > > > > >> > > > > > > > > > > > is
> > > >> > > > > > > >> > > > > > > > > > > > > > > there
> > > >> > > > > > > >> > > > > > > > > > > > > > > > any difference between
> > > them?
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at
> > > 6:21
> > > >> PM,
> > > >> > > > Neha
> > > >> > > > > > > >> Narkhede
> > > >> > > > > > > >> > <
> > > >> > > > > > > >> > > > > > > > > > > n...@confluent.io>
> > > >> > > > > > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > James,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > That is one of the
> many
> > > >> > > monitoring
> > > >> > > > > use
> > > >> > > > > > > >> cases
> > > >> > > > > > > >> > > for
> > > >> > > > > > > >> > > > > the
> > > >> > > > > > > >> > > > > > > > > > > interceptor
> > > >> > > > > > > >> > > > > > > > > > > > > > > > interface.
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > Neha
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016
> at
> > > >> 6:18
> > > >> > PM,
> > > >> > > > > James
> > > >> > > > > > > >> Cheng
> > > >> > > > > > > >> > <
> > > >> > > > > > > >> > > > > > > > > > jch...@tivo.com>
> > > >> > > > > > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Anna,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > I'm trying to
> > > understand
> > > >> a
> > > >> > > > > concrete
> > > >> > > > > > > use
> > > >> > > > > > > >> > case.
> > > >> > > > > > > >> > > > It
> > > >> > > > > > > >> > > > > > > sounds
> > > >> > > > > > > >> > > > > > > > > > like
> > > >> > > > > > > >> > > > > > > > > > > > > > producer
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > interceptors could
> be
> > > >> used
> > > >> > to
> > > >> > > > > > > implement
> > > >> > > > > > > >> > part
> > > >> > > > > > > >> > > of
> > > >> > > > > > > >> > > > > > > > > LinkedIn's
> > > >> > > > > > > >> > > > > > > > > > > > Kafak
> > > >> > > > > > > >> > > > > > > > > > > > > > > Audit
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > tool?
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > >
> > > >> > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Part of that is
> done
> > > by a
> > > >> > > > wrapper
> > > >> > > > > > > >> library
> > > >> > > > > > > >> > > > around
> > > >> > > > > > > >> > > > > > the
> > > >> > > > > > > >> > > > > > > > > kafka
> > > >> > > > > > > >> > > > > > > > > > > > > producer
> > > >> > > > > > > >> > > > > > > > > > > > > > > > that
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > keeps a count of
> the
> > > >> number
> > > >> > of
> > > >> > > > > > > messages
> > > >> > > > > > > >> > > > produced,
> > > >> > > > > > > >> > > > > > and
> > > >> > > > > > > >> > > > > > > > > then
> > > >> > > > > > > >> > > > > > > > > > > > sends
> > > >> > > > > > > >> > > > > > > > > > > > > > that
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > count
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > to a side-topic. It
> > > >> sounds
> > > >> > > like
> > > >> > > > > the
> > > >> > > > > > > >> > producer
> > > >> > > > > > > >> > > > > > > > interceptors
> > > >> > > > > > > >> > > > > > > > > > > could
> > > >> > > > > > > >> > > > > > > > > > > > > > > > possibly
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > be
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > used to implement
> > that?
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > -James
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016,
> at
> > > >> 4:33
> > > >> > PM,
> > > >> > > > > Anna
> > > >> > > > > > > >> > Povzner <
> > > >> > > > > > > >> > > > > > > > > > > a...@confluent.io
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Hi,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > I just created a
> > > KIP-42
> > > >> > for
> > > >> > > > > adding
> > > >> > > > > > > >> > producer
> > > >> > > > > > > >> > > > and
> > > >> > > > > > > >> > > > > > > > > consumer
> > > >> > > > > > > >> > > > > > > > > > > > > > > interceptors
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > for
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > intercepting
> > messages
> > > >> at
> > > >> > > > > different
> > > >> > > > > > > >> points
> > > >> > > > > > > >> > > on
> > > >> > > > > > > >> > > > > > > producer
> > > >> > > > > > > >> > > > > > > > > and
> > > >> > > > > > > >> > > > > > > > > > > > > > consumer.
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> >
> > > >> > > > > > > >>
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Comments and
> > > >> suggestions
> > > >> > are
> > > >> > > > > > > welcome!
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Anna
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > ________________________________
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > This email and any
> > > >> > attachments
> > > >> > > > may
> > > >> > > > > > > >> contain
> > > >> > > > > > > >> > > > > > > confidential
> > > >> > > > > > > >> > > > > > > > > and
> > > >> > > > > > > >> > > > > > > > > > > > > > > privileged
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > material for the
> sole
> > > >> use of
> > > >> > > the
> > > >> > > > > > > >> intended
> > > >> > > > > > > >> > > > > > recipient.
> > > >> > > > > > > >> > > > > > > > Any
> > > >> > > > > > > >> > > > > > > > > > > > review,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > copying,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > or distribution of
> > this
> > > >> > email
> > > >> > > > (or
> > > >> > > > > > any
> > > >> > > > > > > >> > > > > attachments)
> > > >> > > > > > > >> > > > > > by
> > > >> > > > > > > >> > > > > > > > > > others
> > > >> > > > > > > >> > > > > > > > > > > is
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > prohibited.
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > If you are not the
> > > >> intended
> > > >> > > > > > recipient,
> > > >> > > > > > > >> > please
> > > >> > > > > > > >> > > > > > contact
> > > >> > > > > > > >> > > > > > > > the
> > > >> > > > > > > >> > > > > > > > > > > > sender
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > immediately and
> > > >> permanently
> > > >> > > > delete
> > > >> > > > > > > this
> > > >> > > > > > > >> > email
> > > >> > > > > > > >> > > > and
> > > >> > > > > > > >> > > > > > any
> > > >> > > > > > > >> > > > > > > > > > > > > attachments.
> > > >> > > > > > > >> > > > > > > > > > > > > > No
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > employee or agent
> of
> > > TiVo
> > > >> > Inc.
> > > >> > > > is
> > > >> > > > > > > >> > authorized
> > > >> > > > > > > >> > > to
> > > >> > > > > > > >> > > > > > > > conclude
> > > >> > > > > > > >> > > > > > > > > > any
> > > >> > > > > > > >> > > > > > > > > > > > > > binding
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > agreement on behalf
> > of
> > > >> TiVo
> > > >> > > Inc.
> > > >> > > > > by
> > > >> > > > > > > >> email.
> > > >> > > > > > > >> > > > > Binding
> > > >> > > > > > > >> > > > > > > > > > agreements
> > > >> > > > > > > >> > > > > > > > > > > > > with
> > > >> > > > > > > >> > > > > > > > > > > > > > > TiVo
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Inc. may only be
> made
> > > by
> > > >> a
> > > >> > > > signed
> > > >> > > > > > > >> written
> > > >> > > > > > > >> > > > > > agreement.
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > --
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > > > > > > > > Neha
> > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > --
> > > >> > > > > > > >> > > > > > > > > > > > > > *—-*
> > > >> > > > > > > >> > > > > > > > > > > > > > *Todd Palino*
> > > >> > > > > > > >> > > > > > > > > > > > > > Staff Site Reliability
> > Engineer
> > > >> > > > > > > >> > > > > > > > > > > > > > Data Infrastructure
> Streaming
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino
> > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > > > --
> > > >> > > > > > > >> > > > > > > > > > > > > -Regards,
> > > >> > > > > > > >> > > > > > > > > > > > > Mayuresh R. Gharat
> > > >> > > > > > > >> > > > > > > > > > > > > (862) 250-7125
> > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > > > --
> > > >> > > > > > > >> > > > > > > > > > Thanks,
> > > >> > > > > > > >> > > > > > > > > > Neha
> > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > --
> > > >> > > > > > > >> > > > > > > > *—-*
> > > >> > > > > > > >> > > > > > > > *Todd Palino*
> > > >> > > > > > > >> > > > > > > > Staff Site Reliability Engineer
> > > >> > > > > > > >> > > > > > > > Data Infrastructure Streaming
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > > > linkedin.com/in/toddpalino
> > > >> > > > > > > >> > > > > > > >
> > > >> > > > > > > >> > > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > > > --
> > > >> > > > > > > >> > > > > > -Regards,
> > > >> > > > > > > >> > > > > > Mayuresh R. Gharat
> > > >> > > > > > > >> > > > > > (862) 250-7125
> > > >> > > > > > > >> > > > > >
> > > >> > > > > > > >> > > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > --
> > > >> > > > > > > >> > > > *—-*
> > > >> > > > > > > >> > > > *Todd Palino*
> > > >> > > > > > > >> > > > Staff Site Reliability Engineer
> > > >> > > > > > > >> > > > Data Infrastructure Streaming
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > > > linkedin.com/in/toddpalino
> > > >> > > > > > > >> > > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> > > --
> > > >> > > > > > > >> > > -Regards,
> > > >> > > > > > > >> > > Mayuresh R. Gharat
> > > >> > > > > > > >> > > (862) 250-7125
> > > >> > > > > > > >> > >
> > > >> > > > > > > >> >
> > > >> > > > > > > >>
> > > >> > > > > > > >>
> > > >> > > > > > > >>
> > > >> > > > > > > >> --
> > > >> > > > > > > >> -Regards,
> > > >> > > > > > > >> Mayuresh R. Gharat
> > > >> > > > > > > >> (862) 250-7125
> > > >> > > > > > > >>
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > Thanks,
> > > >> > > > > > Neha
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> *—-*
> > > >> *Todd Palino*
> > > >> Staff Site Reliability Engineer
> > > >> Data Infrastructure Streaming
> > > >>
> > > >>
> > > >>
> > > >> linkedin.com/in/toddpalino
> > > >>
> > > >
> > > >
> > >
> >
>
>
>
> --
> Thanks,
> Neha
>

Reply via email to