Regarding record size as bytes sent over the wire, my concern is that it is
almost impossible to calculate per-record. We could do as: 1) compressed
bytes / number of records in a compressed message, as Todd mentioned; or 2)
or same as #1 but take it proportional to uncompressed record size vs.
total uncompressed size of records. All of these calculations give us an
estimate. So maybe record size as bytes sent over the wire is not a
per-record metadata, but rather per topic/partition measure that is better
to be exposed through metrics?


On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino <tpal...@gmail.com> wrote:

> It may be difficult (or nearly impossible) to get actual compressed bytes
> for a message from a compressed batch, but I do think it’s useful
> information to have available for the very reason noted, bandwidth
> consumed. Does it make sense to have an interceptor at the batch level that
> can provide this? The other option is to estimate it (such as making an
> assumption that the messages in a batch are equal in size, which is not
> necessarily true), which is probably not the right answer.
>
> -Todd
>
>
> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner <a...@confluent.io> wrote:
>
> > Hi Becket,
> >
> > It will be up to the interceptor to implement their audit or monitoring
> > strategy. I would also imagine there is more than one good way to do
> audit.
> > So, I agree that some of the interceptors may not use CRC, and we will
> not
> > require it. The question is now whether intercepting CRCs is needed. I
> > think they are very useful for monitoring and audit, because CRC provides
> > an a easy way to get a summary of a message, rather than using message
> > bytes or key/value objects.
> >
> > Regarding record size, I agree that bandwidth example was not a good
> one. I
> > think it would be hard to get actual bytes sent over the wire (your #2),
> > since multiple records get compressed together and we would need to
> decide
> > which bytes to account to which record. So I am inclined to only do your
> > #1. However, it still makes more sense to me just to return record size
> > including the header, since this is the actual record size.
> >
> > Thanks,
> > Anna
> >
> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin <becket....@gmail.com>
> wrote:
> >
> > > Anna,
> > >
> > > Using CRC to do end2end auditing might be very costly because you will
> > need
> > > to collect all the CRC from both producer and consumer. And it is based
> > on
> > > the assumption that broker does not modify the record.
> > > Can you shed some idea on how end to end auditing will be using the CRC
> > > before we decide to expose such low level detail to the end user? It
> > would
> > > also be helpful if you can compare it with something like sequence
> number
> > > based auditing.
> > >
> > > About the record size, one thing worth notice is that the size of
> Record
> > is
> > > not the actual bytes sent over the wire if we use compression. So that
> > does
> > > not really tell user how much bandwidth they are using. Personally I
> > think
> > > two kinds of size may be useful.
> > > 1. The record size after serialization, i.e. application bytes. (The
> > > uncompressed record size can be easily derived as well)
> > > 2. The actual bytes sent over the wire.
> > > We can get (1) easily, but (2) is difficult to get at Record level when
> > we
> > > use compression.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <a...@confluent.io>
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > The use-case for CRC is end-to-end audit, rather than checking
> whether
> > a
> > > > single message is corrupt or not.
> > > >
> > > > Regarding record size, I was thinking to extract record size from
> > Record.
> > > > That will include header overhead as well. I think total record size
> > will
> > > > tell users how much bandwidth their messages take. Since header is
> > > > relatively small and constant, users also will get an idea of their
> > > > key/value sizes.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > > > I am +1 on #1.2 and #3.
> > > > >
> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there
> > any
> > > > > specific use case? Currently we validate messages by calling
> > > > ensureValid()
> > > > > to verify the checksum and throw exception if it does not match.
> > > > >
> > > > > Message size would be useful. We can add that to ConsumerRecord.
> Can
> > > you
> > > > > clarify the message size you are referring to? Does it include the
> > > > message
> > > > > header overhead or not? From user's point of view, they probably
> > don't
> > > > care
> > > > > about header size.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <n...@confluent.io>
> > > > wrote:
> > > > >
> > > > > > Anna,
> > > > > >
> > > > > > Thanks for being diligent.
> > > > > >
> > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and
> > > size
> > > > > > fields to RecordMetadata and ConsumerRecord instead of exposing
> > > > metadata
> > > > > > piecemeal in the interceptor APIs.
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <a...@confluent.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > >
> > > > > > > The KIP wiki page is now up-to-date with the scope we have
> agreed
> > > on:
> > > > > > > Producer and Consumer Interceptors with a minimal set of
> mutable
> > > API
> > > > > that
> > > > > > > are not dependent on producer and consumer internal
> > implementation.
> > > > > > >
> > > > > > > I have few more API details that I would like to bring
> attention
> > to
> > > > > > or/and
> > > > > > > discuss:
> > > > > > >
> > > > > > > 1. Handling exceptions
> > > > > > >
> > > > > > > Exceptions can provide an additional level of control. For
> > example,
> > > > we
> > > > > > can
> > > > > > > filter messages on consumer side or stop messages on producer
> if
> > > they
> > > > > > don’t
> > > > > > > have the right field.
> > > > > > >
> > > > > > > I see two options:
> > > > > > > 1.1. For callbacks that can mutate records (onSend and
> > onConsume),
> > > > > > > propagate exceptions through the original calls
> > > (KafkaProducer.send()
> > > > > and
> > > > > > > KafkaConsumer.poll() respectively). For other callbacks, catch
> > > > > exception,
> > > > > > > log, and ignore.
> > > > > > > 1.2. Catch exceptions from all the interceptor callbacks and
> > > ignore.
> > > > > > >
> > > > > > > The issue with 1.1. is that it effectively changes
> > > > KafkaProducer.send()
> > > > > > and
> > > > > > > KafkaConsumer.poll() API, since now they may throw exceptions
> > that
> > > > are
> > > > > > not
> > > > > > > documented in KafkaProducer/Consumer API. Another option is to
> > > allow
> > > > to
> > > > > > > propagate some exceptions, and ignore others.
> > > > > > >
> > > > > > > I think our use-cases do not require propagating exceptions.
> So,
> > I
> > > > > > propose
> > > > > > > option 1.2. Unless someone has suggestion/use-cases for
> > propagating
> > > > > > > exceptions. Please let me know.
> > > > > > >
> > > > > > > 2. Intercepting record CRC and record size
> > > > > > >
> > > > > > > Since we decided not to add any intermediate callbacks (such as
> > > > > onEnqueue
> > > > > > > or onReceive) to interceptors, I think it is still valuable to
> > > > > intercept
> > > > > > > record CRC and record size in bytes for monitoring and audit
> > > > use-cases.
> > > > > > >
> > > > > > > I propose to add checksum and size fields to RecordMetadata and
> > > > > > > ConsumerRecord. Another option would be to add them as
> parameters
> > > in
> > > > > > > onAcknowledgement() and onConsume() callbacks.
> > > > > > >
> > > > > > > 3. Callbacks that allow to modify records look as follows:
> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
> > > > > > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
> > > > > > >
> > > > > > > This means that interceptors can potentially modify
> > topic/partition
> > > > in
> > > > > > > ProducerRecord and topic/partition/offset in ConsumerRecord. I
> > > > propose
> > > > > > that
> > > > > > > it is up to the interceptor implementation to ensure that
> > > > > > topic/partition,
> > > > > > > etc is correct. KafkaProducer.send() will use topic, partition,
> > > key,
> > > > > and
> > > > > > > value from ProducerRecord returned from the onSend().
> Similarly,
> > > > > > > ConsumerRecords returned from KafkaConsumer.poll() would be the
> > > ones
> > > > > > > returned from the interceptor.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > Please let me know if you have any suggestions or objections to
> > the
> > > > > > above.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Anna
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <
> a...@confluent.io
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi Mayuresh,
> > > > > > > >
> > > > > > > > I see why you would want to check for messages left in the
> > > > > > > > RecordAccumulator. However, I don't think this will
> completely
> > > > solve
> > > > > > the
> > > > > > > > problem. Messages could be in-flight somewhere else, like in
> > the
> > > > > > socket,
> > > > > > > or
> > > > > > > > there maybe in-flight messages on the consumer side of the
> > > > > MirrorMaker.
> > > > > > > So,
> > > > > > > > if we go the route of checking whether there are any
> in-flight
> > > > > messages
> > > > > > > for
> > > > > > > > topic deletion use-case, maybe it is better count them with
> > > > onSend()
> > > > > > and
> > > > > > > > onAcknowledge() -- whether all messages sent were
> > acknowledged. I
> > > > > also
> > > > > > > > think that it would be better to solve this without
> > interceptors,
> > > > > such
> > > > > > as
> > > > > > > > fix error handling in this scenario. However, I do not have
> any
> > > > good
> > > > > > > > proposal right now, so these are just general thoughts.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Anna
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > > > > > > > gharatmayures...@gmail.com> wrote:
> > > > > > > >
> > > > > > > >> Calling producer.flush(), flushes all the data. So this is
> OK.
> > > But
> > > > > > when
> > > > > > > >> you
> > > > > > > >> are running Mirror maker, I am not sure there is a way to
> > > flush()
> > > > > from
> > > > > > > >> outside.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >>
> > > > > > > >> Mayuresh
> > > > > > > >>
> > > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <
> > > > becket....@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Mayuresh,
> > > > > > > >> >
> > > > > > > >> > Regarding your use case about mirror maker. Is it good
> > enough
> > > as
> > > > > > long
> > > > > > > >> as we
> > > > > > > >> > know there is no message for the topic in the producer
> > > anymore?
> > > > If
> > > > > > > that
> > > > > > > >> is
> > > > > > > >> > the case, call producer.flush() is sufficient.
> > > > > > > >> >
> > > > > > > >> > Thanks,
> > > > > > > >> >
> > > > > > > >> > Jiangjie (Becket) Qin
> > > > > > > >> >
> > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> > > > > > > >> > gharatmayures...@gmail.com
> > > > > > > >> > > wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi Anna,
> > > > > > > >> > >
> > > > > > > >> > > Thanks a lot for summarizing the discussion on this kip.
> > > > > > > >> > >
> > > > > > > >> > > It LGTM.
> > > > > > > >> > > This is really nice :
> > > > > > > >> > > We decided not to add any callbacks to producer and
> > consumer
> > > > > > > >> > > interceptors that will depend on internal implementation
> > as
> > > > part
> > > > > > of
> > > > > > > >> this
> > > > > > > >> > > KIP.
> > > > > > > >> > > *However, it is possible to add them later as part of
> > > another
> > > > > KIP
> > > > > > if
> > > > > > > >> > there
> > > > > > > >> > > are good use-cases.*
> > > > > > > >> > >
> > > > > > > >> > > Do you agree with the use case I explained earlier for
> > > knowing
> > > > > the
> > > > > > > >> number
> > > > > > > >> > > of records left in the RecordAccumulator for a
> particular
> > > > topic.
> > > > > > It
> > > > > > > >> might
> > > > > > > >> > > be orthogonal to this KIP, but will be helpful. What do
> > you
> > > > > think?
> > > > > > > >> > >
> > > > > > > >> > > Thanks,
> > > > > > > >> > >
> > > > > > > >> > > Mayuresh
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <
> > > > tpal...@gmail.com
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >> > >
> > > > > > > >> > > > This looks good. As noted, having one mutable
> > interceptor
> > > on
> > > > > > each
> > > > > > > >> side
> > > > > > > >> > > > allows for the use cases we can envision right now,
> and
> > I
> > > > > think
> > > > > > > >> that’s
> > > > > > > >> > > > going to provide a great deal of opportunity for
> > > > implementing
> > > > > > > things
> > > > > > > >> > like
> > > > > > > >> > > > audit, especially within a multi-tenant environment.
> > > Looking
> > > > > > > >> forward to
> > > > > > > >> > > > getting this available in the clients.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks!
> > > > > > > >> > > >
> > > > > > > >> > > > -Todd
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <
> > > > > > a...@confluent.io>
> > > > > > > >> > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hi All,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Here is meeting notes from today’s KIP meeting:
> > > > > > > >> > > > >
> > > > > > > >> > > > > 1. We agreed to keep the scope of this KIP to be
> > > producer
> > > > > and
> > > > > > > >> > consumer
> > > > > > > >> > > > > interceptors only. Broker-side interceptor will be
> > added
> > > > > later
> > > > > > > as
> > > > > > > >> a
> > > > > > > >> > > > > separate KIP. The reasons were already mentioned in
> > this
> > > > > > thread,
> > > > > > > >> but
> > > > > > > >> > > the
> > > > > > > >> > > > > summary is:
> > > > > > > >> > > > >  * Broker interceptor is riskier and requires
> careful
> > > > > > > >> consideration
> > > > > > > >> > > about
> > > > > > > >> > > > > overheads, whether to intercept leaders vs.
> > > > > leaders/replicas,
> > > > > > > >> what to
> > > > > > > >> > > do
> > > > > > > >> > > > on
> > > > > > > >> > > > > leader failover and so on.
> > > > > > > >> > > > >  * Broker interceptors increase monitoring
> resolution,
> > > but
> > > > > not
> > > > > > > >> > > including
> > > > > > > >> > > > it
> > > > > > > >> > > > > in this KIP does not reduce usefulness of producer
> and
> > > > > > consumer
> > > > > > > >> > > > > interceptors that enable end-to-end monitoring
> > > > > > > >> > > > >
> > > > > > > >> > > > > 2. We agreed to scope ProducerInterceptor and
> > > > > > > ConsumerInterceptor
> > > > > > > >> > > > callbacks
> > > > > > > >> > > > > to minimal set of mutable API that are not dependent
> > on
> > > > > > producer
> > > > > > > >> and
> > > > > > > >> > > > > consumer internal implementation.
> > > > > > > >> > > > >
> > > > > > > >> > > > > ProducerInterceptor:
> > > > > > > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V>
> > > > record);*
> > > > > > > >> > > > > *void onAcknowledgement(RecordMetadata metadata,
> > > Exception
> > > > > > > >> > exception);*
> > > > > > > >> > > > >
> > > > > > > >> > > > > ConsumerInterceptor:
> > > > > > > >> > > > > *ConsumerRecords<K, V> onConsume(ConsumerRecords<K,
> V>
> > > > > > > records);*
> > > > > > > >> > > > > *void onCommit(Map<TopicPartition,
> OffsetAndMetadata>
> > > > > > offsets);*
> > > > > > > >> > > > >
> > > > > > > >> > > > > We will allow interceptors to modify ProducerRecord
> on
> > > > > > producer
> > > > > > > >> side,
> > > > > > > >> > > and
> > > > > > > >> > > > > modify ConsumerRecords on consumer side. This will
> > > support
> > > > > > > >> end-to-end
> > > > > > > >> > > > > monitoring and auditing and support the ability to
> add
> > > > > > metadata
> > > > > > > >> for a
> > > > > > > >> > > > > message. This will support Todd’s Auditing and
> Routing
> > > > > > > use-cases.
> > > > > > > >> > > > >
> > > > > > > >> > > > > We did not find any use-case for modifying records
> in
> > > > > > > onConsume()
> > > > > > > >> > > > callback,
> > > > > > > >> > > > > but decided to enable modification of consumer
> records
> > > for
> > > > > > > >> symmetry
> > > > > > > >> > > with
> > > > > > > >> > > > > onSend().
> > > > > > > >> > > > >
> > > > > > > >> > > > > 3. We agreed to ensure compatibility when/if we add
> > new
> > > > > > methods
> > > > > > > to
> > > > > > > >> > > > > ProducerInterceptor and ConsumerInterceptor by using
> > > > default
> > > > > > > >> methods
> > > > > > > >> > > with
> > > > > > > >> > > > > an empty implementation. Ok to assume Java 8. (This
> is
> > > > > > Ismael’s
> > > > > > > >> > method
> > > > > > > >> > > > #2).
> > > > > > > >> > > > >
> > > > > > > >> > > > > 4. We decided not to add any callbacks to producer
> and
> > > > > > consumer
> > > > > > > >> > > > > interceptors that will depend on internal
> > implementation
> > > > as
> > > > > > part
> > > > > > > >> of
> > > > > > > >> > > this
> > > > > > > >> > > > > KIP. However, it is possible to add them later as
> part
> > > of
> > > > > > > another
> > > > > > > >> KIP
> > > > > > > >> > > if
> > > > > > > >> > > > > there are good use-cases.
> > > > > > > >> > > > >
> > > > > > > >> > > > > *Reasoning.* We did not have concrete use-cases that
> > > > > justified
> > > > > > > >> more
> > > > > > > >> > > > methods
> > > > > > > >> > > > > at this point. Some of the use-cases were for more
> > > > > fine-grain
> > > > > > > >> latency
> > > > > > > >> > > > > collection, which could be done with Kafka Metrics.
> > > > Another
> > > > > > > >> use-case
> > > > > > > >> > > was
> > > > > > > >> > > > > encryption. However, there are several design
> options
> > > for
> > > > > > > >> encryption.
> > > > > > > >> > > One
> > > > > > > >> > > > > is to do per-record encryption which would require
> > > adding
> > > > > > > >> > > > > ProducerInterceptor.onEnqueued() and
> > > > > > > >> ConsumerInterceptor.onReceive().
> > > > > > > >> > > One
> > > > > > > >> > > > > could argue that in that case encryption could be
> done
> > > by
> > > > > > > adding a
> > > > > > > >> > > custom
> > > > > > > >> > > > > serializer/deserializer. Another option is to do
> > > > encryption
> > > > > > > after
> > > > > > > >> > > message
> > > > > > > >> > > > > gets compressed, but there are issues that arise
> > > regarding
> > > > > > > broker
> > > > > > > >> > doing
> > > > > > > >> > > > > re-compression. We decided that it is better to have
> > > that
> > > > > > > >> discussion
> > > > > > > >> > > in a
> > > > > > > >> > > > > separate KIP and decide that this is something we
> want
> > > to
> > > > do
> > > > > > > with
> > > > > > > >> > > > > interceptors or by other means.
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > > Todd, Mayuresh and others who missed the KIP
> meeting,
> > > > please
> > > > > > let
> > > > > > > >> me
> > > > > > > >> > > know
> > > > > > > >> > > > > your thoughts on the scope we agreed on during the
> > > > meeting.
> > > > > > > >> > > > >
> > > > > > > >> > > > > I will update the KIP proposal with the current
> > decision
> > > > by
> > > > > > end
> > > > > > > of
> > > > > > > >> > > today.
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks,
> > > > > > > >> > > > > Anna
> > > > > > > >> > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
> > > > > > > >> > > > > gharatmayures...@gmail.com> wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Hi,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > I won't be able to make it to KIP hangout due to
> > > > conflict.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Anna, here is the use case where knowing if there
> > are
> > > > > > messages
> > > > > > > >> in
> > > > > > > >> > the
> > > > > > > >> > > > > > RecordAccumulator left to be sent to the kafka
> > cluster
> > > > > for a
> > > > > > > >> topic
> > > > > > > >> > is
> > > > > > > >> > > > > > useful.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 1) Consider a pipeline :
> > > > > > > >> > > > > > A ---> Mirror-maker -----> B
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 2) We have a topic T in cluster A mirrored to
> > cluster
> > > B.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > 3) Now if we delete topic T in A and immediately
> > > proceed
> > > > > to
> > > > > > > >> delete
> > > > > > > >> > > the
> > > > > > > >> > > > > > topic in cluster B, some of the the Mirror-maker
> > > > machines
> > > > > > die
> > > > > > > >> > because
> > > > > > > >> > > > > > atleast one of the batches in RecordAccumulator
> for
> > > > topic
> > > > > T
> > > > > > > >> fail to
> > > > > > > >> > > be
> > > > > > > >> > > > > > produced to cluster B. We have seen this happening
> > in
> > > > our
> > > > > > > >> clusters.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > If we know that there are no more messages left in
> > the
> > > > > > > >> > > > RecordAccumulator
> > > > > > > >> > > > > to
> > > > > > > >> > > > > > be produced to cluster B, we can safely delete the
> > > topic
> > > > > in
> > > > > > > >> > cluster B
> > > > > > > >> > > > > > without disturbing the pipeline.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Mayuresh
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <
> > > > > > > >> a...@confluent.io>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > > Thanks Ismael and Todd for your feedback!
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > I agree about coming up with lean, but useful
> > > > interfaces
> > > > > > > that
> > > > > > > >> > will
> > > > > > > >> > > be
> > > > > > > >> > > > > > easy
> > > > > > > >> > > > > > > to extend later.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > When we discuss the minimal set of producer and
> > > > consumer
> > > > > > > >> > > interceptor
> > > > > > > >> > > > > API
> > > > > > > >> > > > > > in
> > > > > > > >> > > > > > > today’s KIP meeting (discussion item #2 in my
> > > previous
> > > > > > > email),
> > > > > > > >> > lets
> > > > > > > >> > > > > > compare
> > > > > > > >> > > > > > > two options:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > *1. Minimal set of immutable API for producer
> and
> > > > > consumer
> > > > > > > >> > > > > interceptors*
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > ProducerInterceptor:
> > > > > > > >> > > > > > > public void onSend(ProducerRecord<K, V> record);
> > > > > > > >> > > > > > > public void onAcknowledgement(RecordMetadata
> > > metadata,
> > > > > > > >> Exception
> > > > > > > >> > > > > > > exception);
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > > > > > >> > > > > > > public void onConsume(ConsumerRecords<K, V>
> > > records);
> > > > > > > >> > > > > > > public void onCommit(Map<TopicPartition,
> > > > > > OffsetAndMetadata>
> > > > > > > >> > > offsets);
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Use-cases:
> > > > > > > >> > > > > > > — end-to-end monitoring; custom tracing and
> > logging
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > *2. Minimal set of mutable API for producer and
> > > > consumer
> > > > > > > >> > > > interceptors*
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > ProducerInterceptor:
> > > > > > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V>
> > > > > record);
> > > > > > > >> > > > > > > void onAcknowledgement(RecordMetadata metadata,
> > > > > Exception
> > > > > > > >> > > exception);
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > > > > > >> > > > > > > void onConsume(ConsumerRecords<K, V> records);
> > > > > > > >> > > > > > > void onCommit(Map<TopicPartition,
> > OffsetAndMetadata>
> > > > > > > offsets);
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Additional use-cases to #1:
> > > > > > > >> > > > > > > — Ability to add metadata to a message or fill
> in
> > > > > standard
> > > > > > > >> fields
> > > > > > > >> > > for
> > > > > > > >> > > > > > audit
> > > > > > > >> > > > > > > and routing.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Implications
> > > > > > > >> > > > > > > — Partition assignment will be done based on
> > > modified
> > > > > > > >> key/value
> > > > > > > >> > > > instead
> > > > > > > >> > > > > > of
> > > > > > > >> > > > > > > original key/value. If key/value transformation
> is
> > > not
> > > > > > > >> consistent
> > > > > > > >> > > > (same
> > > > > > > >> > > > > > key
> > > > > > > >> > > > > > > and value does not mutate to the same, but
> > modified,
> > > > > > > >> key/value),
> > > > > > > >> > > then
> > > > > > > >> > > > > log
> > > > > > > >> > > > > > > compaction would not work. However, audit and
> > > routing
> > > > > > > >> use-cases
> > > > > > > >> > > from
> > > > > > > >> > > > > Todd
> > > > > > > >> > > > > > > will likely do consistent transformation.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > *Additional callbacks (discussion item #3 in my
> > > > previous
> > > > > > > >> email):*
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > If we want to support encryption, we would want
> to
> > > be
> > > > > able
> > > > > > > to
> > > > > > > >> > > modify
> > > > > > > >> > > > > > > serialized key/value, rather than key and value
> > > > objects.
> > > > > > > This
> > > > > > > >> > will
> > > > > > > >> > > > add
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > following API to producer and consumer
> > interceptors:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > ProducerInterceptor:
> > > > > > > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition tp,
> > > > > > > >> > ProducerRecord<K,
> > > > > > > >> > > V>
> > > > > > > >> > > > > > > record, SerializedKeyValue serializedKeyValue);
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > > > > > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp,
> > > > > > > >> > SerializedKeyValue
> > > > > > > >> > > > > > > serializedKeyValue);
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > I am leaning towards implementing the minimal
> set
> > of
> > > > > > > >> immutable or
> > > > > > > >> > > > > mutable
> > > > > > > >> > > > > > > interfaces, making sure that we have a
> > compatibility
> > > > > plan
> > > > > > > that
> > > > > > > >> > > allows
> > > > > > > >> > > > > us
> > > > > > > >> > > > > > to
> > > > > > > >> > > > > > > add more callbacks in the future (per Ismael
> > > comment),
> > > > > and
> > > > > > > add
> > > > > > > >> > more
> > > > > > > >> > > > > APIs
> > > > > > > >> > > > > > > later. E.g., for encryption use-case, there
> could
> > be
> > > > an
> > > > > > > >> argument
> > > > > > > >> > in
> > > > > > > >> > > > > doing
> > > > > > > >> > > > > > > encryption after message compression vs.
> > per-record
> > > > > > > encryption
> > > > > > > >> > that
> > > > > > > >> > > > > could
> > > > > > > >> > > > > > > be done using the above additional API. There is
> > > also
> > > > > more
> > > > > > > >> > > > implications
> > > > > > > >> > > > > > for
> > > > > > > >> > > > > > > every API that modifies records: modifying
> > > serialized
> > > > > > > >> key/value
> > > > > > > >> > > will
> > > > > > > >> > > > > > again
> > > > > > > >> > > > > > > impact partition assignment (we will likely do
> > that
> > > > > after
> > > > > > > >> > partition
> > > > > > > >> > > > > > > assignment), which may impact log compaction and
> > > > mirror
> > > > > > > maker
> > > > > > > >> > > > > > partitioning.
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > Thanks,
> > > > > > > >> > > > > > > Anna
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <
> > > > > > > >> tpal...@gmail.com>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > > > > Finally got a chance to take a look at this. I
> > > won’t
> > > > > be
> > > > > > > >> able to
> > > > > > > >> > > > make
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > > KIP meeting due to a conflict.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > I’m somewhat disappointed in this proposal. I
> > > think
> > > > > that
> > > > > > > the
> > > > > > > >> > > > explicit
> > > > > > > >> > > > > > > > exclusion of modification of the messages is
> > > > > > > short-sighted,
> > > > > > > >> and
> > > > > > > >> > > not
> > > > > > > >> > > > > > > > accounting for it now is going to bite us
> later.
> > > > Jay,
> > > > > > > aren’t
> > > > > > > >> > you
> > > > > > > >> > > > the
> > > > > > > >> > > > > > one
> > > > > > > >> > > > > > > > railing against public interfaces and how
> > > difficult
> > > > > they
> > > > > > > >> are to
> > > > > > > >> > > > work
> > > > > > > >> > > > > > with
> > > > > > > >> > > > > > > > when you don’t get them right? The “simple”
> > change
> > > > to
> > > > > > one
> > > > > > > of
> > > > > > > >> > > these
> > > > > > > >> > > > > > > > interfaces to make it able to return a record
> is
> > > > going
> > > > > > to
> > > > > > > >> be a
> > > > > > > >> > > > > > > significant
> > > > > > > >> > > > > > > > change and is going to require all clients to
> > > > rewrite
> > > > > > > their
> > > > > > > >> > > > > > interceptors.
> > > > > > > >> > > > > > > > If we’re not willing to put the time to think
> > > > through
> > > > > > > >> > > manipulation
> > > > > > > >> > > > > now,
> > > > > > > >> > > > > > > > then this KIP should be shelved until we are.
> > > > > > Implementing
> > > > > > > >> > > > something
> > > > > > > >> > > > > > > > halfway is going to be worse than taking a
> > little
> > > > > > longer.
> > > > > > > In
> > > > > > > >> > > > > addition,
> > > > > > > >> > > > > > I
> > > > > > > >> > > > > > > > don’t believe that manipulation requires
> > anything
> > > > more
> > > > > > > than
> > > > > > > >> > > > > > interceptors
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > > receive the full record, and then to return
> it.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > There are 3 use case I can think of right now
> > > > without
> > > > > > any
> > > > > > > >> deep
> > > > > > > >> > > > > > discussion
> > > > > > > >> > > > > > > > that can make use of interceptors with
> > > modification:
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > 1. Auditing. The ability to add metadata to a
> > > > message
> > > > > > for
> > > > > > > >> > > auditing
> > > > > > > >> > > > is
> > > > > > > >> > > > > > > > critical. Hostname, service name, timestamps,
> > etc.
> > > > are
> > > > > > all
> > > > > > > >> > pieces
> > > > > > > >> > > > of
> > > > > > > >> > > > > > data
> > > > > > > >> > > > > > > > that can be used on the other side of the
> > pipeline
> > > > to
> > > > > > > >> > categorize
> > > > > > > >> > > > > > > messages,
> > > > > > > >> > > > > > > > determine loss and transport time, and pin
> down
> > > > > issues.
> > > > > > > You
> > > > > > > >> may
> > > > > > > >> > > say
> > > > > > > >> > > > > > that
> > > > > > > >> > > > > > > > these things can just be part of the message
> > > schema,
> > > > > but
> > > > > > > >> anyone
> > > > > > > >> > > who
> > > > > > > >> > > > > has
> > > > > > > >> > > > > > > > worked with a multi-user data system
> (especially
> > > > those
> > > > > > who
> > > > > > > >> have
> > > > > > > >> > > > been
> > > > > > > >> > > > > > > > involved with LinkedIn) know how difficult it
> is
> > > to
> > > > > > > maintain
> > > > > > > >> > > > > consistent
> > > > > > > >> > > > > > > > message schemas and to get other people to put
> > in
> > > > > fields
> > > > > > > for
> > > > > > > >> > your
> > > > > > > >> > > > > use.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > 2. Encryption. This is probably the most
> obvious
> > > > case
> > > > > > for
> > > > > > > >> > record
> > > > > > > >> > > > > > > > manipulation on both sides. The ability to tie
> > in
> > > > end
> > > > > to
> > > > > > > end
> > > > > > > >> > > > > encryption
> > > > > > > >> > > > > > > is
> > > > > > > >> > > > > > > > important for data that requires external
> > > compliance
> > > > > > (PCI,
> > > > > > > >> > HIPAA,
> > > > > > > >> > > > > > etc.).
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > 3. Routing. By being able to add a bit of
> > > > information
> > > > > > > about
> > > > > > > >> the
> > > > > > > >> > > > > source
> > > > > > > >> > > > > > or
> > > > > > > >> > > > > > > > destination of a message to the metadata, you
> > can
> > > > > easily
> > > > > > > >> > > construct
> > > > > > > >> > > > an
> > > > > > > >> > > > > > > > intelligent mirror maker that can prevent
> loops.
> > > > This
> > > > > > has
> > > > > > > >> the
> > > > > > > >> > > > > > opportunity
> > > > > > > >> > > > > > > > to result in significant operational savings,
> as
> > > you
> > > > > can
> > > > > > > get
> > > > > > > >> > rid
> > > > > > > >> > > of
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > need for tiered clusters in order to prevent
> > loops
> > > > in
> > > > > > > >> mirroring
> > > > > > > >> > > > > > messages.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > All three of these share the feature that they
> > add
> > > > > > > metadata
> > > > > > > >> to
> > > > > > > >> > > > > > messages.
> > > > > > > >> > > > > > > > With the pushback on having arbitrary metadata
> > as
> > > an
> > > > > > > >> “envelope”
> > > > > > > >> > > to
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > message, this is a way to provide it and make
> it
> > > the
> > > > > > > >> > > responsibility
> > > > > > > >> > > > > of
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > > client, and not the Kafka broker and system
> > > itself.
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > -Todd
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma <
> > > > > > > >> > ism...@juma.me.uk>
> > > > > > > >> > > > > > wrote:
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > > Hi Anna and Neha,
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > I think it makes a lot of sense to try and
> > keep
> > > > the
> > > > > > > >> interface
> > > > > > > >> > > > lean
> > > > > > > >> > > > > > and
> > > > > > > >> > > > > > > to
> > > > > > > >> > > > > > > > > add more methods later when/if there is a
> > need.
> > > > What
> > > > > > is
> > > > > > > >> the
> > > > > > > >> > > > current
> > > > > > > >> > > > > > > > > thinking with regards to compatibility
> when/if
> > > we
> > > > > add
> > > > > > > new
> > > > > > > >> > > > methods?
> > > > > > > >> > > > > A
> > > > > > > >> > > > > > > few
> > > > > > > >> > > > > > > > > options come to mind:
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > 1. Change the interface to an abstract class
> > > with
> > > > > > empty
> > > > > > > >> > > > > > implementations
> > > > > > > >> > > > > > > > for
> > > > > > > >> > > > > > > > > all the methods. This means that the path to
> > > > adding
> > > > > > new
> > > > > > > >> > methods
> > > > > > > >> > > > is
> > > > > > > >> > > > > > > clear.
> > > > > > > >> > > > > > > > > 2. Hope we have moved to Java 8 by the time
> we
> > > > need
> > > > > to
> > > > > > > add
> > > > > > > >> > new
> > > > > > > >> > > > > > methods
> > > > > > > >> > > > > > > > and
> > > > > > > >> > > > > > > > > use default methods with an empty
> > implementation
> > > > for
> > > > > > any
> > > > > > > >> new
> > > > > > > >> > > > method
> > > > > > > >> > > > > > > (and
> > > > > > > >> > > > > > > > > potentially make existing methods default
> > > methods
> > > > > too
> > > > > > at
> > > > > > > >> that
> > > > > > > >> > > > point
> > > > > > > >> > > > > > for
> > > > > > > >> > > > > > > > > consistency)
> > > > > > > >> > > > > > > > > 3. Introduce a new interface that inherits
> > from
> > > > the
> > > > > > > >> existing
> > > > > > > >> > > > > > > Interceptor
> > > > > > > >> > > > > > > > > interface when we need to add new methods.
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > Option 1 is the easiest and it also means
> that
> > > > > > > interceptor
> > > > > > > >> > > users
> > > > > > > >> > > > > only
> > > > > > > >> > > > > > > > need
> > > > > > > >> > > > > > > > > to override the methods that they are
> > interested
> > > > > (more
> > > > > > > >> useful
> > > > > > > >> > > if
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > number
> > > > > > > >> > > > > > > > > of methods grows). The downside is that
> > > > interceptor
> > > > > > > >> > > > implementations
> > > > > > > >> > > > > > > > cannot
> > > > > > > >> > > > > > > > > inherit from another class (a
> straightforward
> > > > > > workaround
> > > > > > > >> is
> > > > > > > >> > to
> > > > > > > >> > > > make
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > > > interceptor a forwarder that calls another
> > > class).
> > > > > > Also,
> > > > > > > >> our
> > > > > > > >> > > > > existing
> > > > > > > >> > > > > > > > > callbacks are interfaces, so seems a bit
> > > > > inconsistent.
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > Option 2 may be the most appealing one as
> both
> > > > users
> > > > > > and
> > > > > > > >> > > > ourselves
> > > > > > > >> > > > > > > retain
> > > > > > > >> > > > > > > > > flexibility. The main downside is that it
> > relies
> > > > on
> > > > > us
> > > > > > > >> moving
> > > > > > > >> > > to
> > > > > > > >> > > > > Java
> > > > > > > >> > > > > > > 8,
> > > > > > > >> > > > > > > > > which may be more than a year away
> potentially
> > > (if
> > > > > we
> > > > > > > >> support
> > > > > > > >> > > the
> > > > > > > >> > > > > > last
> > > > > > > >> > > > > > > 2
> > > > > > > >> > > > > > > > > Java releases).
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > Thoughts?
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > Ismael
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha
> > Narkhede <
> > > > > > > >> > > > n...@confluent.io>
> > > > > > > >> > > > > > > > wrote:
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > > Anna,
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > I'm also in favor of including just the
> APIs
> > > for
> > > > > > which
> > > > > > > >> we
> > > > > > > >> > > have
> > > > > > > >> > > > a
> > > > > > > >> > > > > > > clear
> > > > > > > >> > > > > > > > > use
> > > > > > > >> > > > > > > > > > case. If more use cases for finer
> monitoring
> > > > show
> > > > > up
> > > > > > > in
> > > > > > > >> the
> > > > > > > >> > > > > future,
> > > > > > > >> > > > > > > we
> > > > > > > >> > > > > > > > > can
> > > > > > > >> > > > > > > > > > always update the interface. Would you
> > please
> > > > > > > highlight
> > > > > > > >> in
> > > > > > > >> > > the
> > > > > > > >> > > > > KIP
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > > > APIs
> > > > > > > >> > > > > > > > > > that you think we have an immediate use
> for?
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Joel,
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Broker-side monitoring makes a lot of
> sense
> > in
> > > > the
> > > > > > > long
> > > > > > > >> > term
> > > > > > > >> > > > > > though I
> > > > > > > >> > > > > > > > > don't
> > > > > > > >> > > > > > > > > > think it is a requirement for end-to-end
> > > > > monitoring.
> > > > > > > >> With
> > > > > > > >> > the
> > > > > > > >> > > > > > > producer
> > > > > > > >> > > > > > > > > and
> > > > > > > >> > > > > > > > > > consumer interceptors, you have the
> ability
> > to
> > > > get
> > > > > > > full
> > > > > > > >> > > > > > > > > > publish-to-subscribe end-to-end
> monitoring.
> > > The
> > > > > > broker
> > > > > > > >> > > > > interceptor
> > > > > > > >> > > > > > > > > > certainly improves the resolution of
> > > monitoring
> > > > > but
> > > > > > it
> > > > > > > >> is
> > > > > > > >> > > also
> > > > > > > >> > > > a
> > > > > > > >> > > > > > > > riskier
> > > > > > > >> > > > > > > > > > change. I prefer an incremental approach
> > over
> > > a
> > > > > > > big-bang
> > > > > > > >> > and
> > > > > > > >> > > > > > > recommend
> > > > > > > >> > > > > > > > > > taking baby-steps. Let's first make sure
> the
> > > > > > > >> > > producer/consumer
> > > > > > > >> > > > > > > > > interceptors
> > > > > > > >> > > > > > > > > > are successful. And then come back and add
> > the
> > > > > > broker
> > > > > > > >> > > > interceptor
> > > > > > > >> > > > > > > > > > carefully.
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Having said that, it would be great to
> > > > understand
> > > > > > your
> > > > > > > >> > > proposal
> > > > > > > >> > > > > for
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > > > > broker interceptor independently. We can
> > > either
> > > > > add
> > > > > > an
> > > > > > > >> > > > > interceptor
> > > > > > > >> > > > > > > > > > on-append or on-commit. If people want to
> > use
> > > > this
> > > > > > for
> > > > > > > >> > > > > monitoring,
> > > > > > > >> > > > > > > then
> > > > > > > >> > > > > > > > > > possibly on-commit might be more useful?
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > Neha
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay
> Kreps <
> > > > > > > >> > j...@confluent.io
> > > > > > > >> > > >
> > > > > > > >> > > > > > wrote:
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > > Hey Joel,
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > What is the interface you are thinking
> of?
> > > > > > Something
> > > > > > > >> like
> > > > > > > >> > > > this:
> > > > > > > >> > > > > > > > > > >     onAppend(String topic, int
> partition,
> > > > > Records
> > > > > > > >> > records,
> > > > > > > >> > > > long
> > > > > > > >> > > > > > > time)
> > > > > > > >> > > > > > > > > > > ?
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > One challenge right now is that we are
> > still
> > > > > using
> > > > > > > the
> > > > > > > >> > old
> > > > > > > >> > > > > > > > > > > Message/MessageSet classes on the broker
> > > which
> > > > > I'm
> > > > > > > not
> > > > > > > >> > sure
> > > > > > > >> > > > if
> > > > > > > >> > > > > > we'd
> > > > > > > >> > > > > > > > > want
> > > > > > > >> > > > > > > > > > to
> > > > > > > >> > > > > > > > > > > support over the long haul but it might
> be
> > > > okay
> > > > > > just
> > > > > > > >> to
> > > > > > > >> > > > create
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > > > > records
> > > > > > > >> > > > > > > > > > > instance for this interface.
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > -Jay
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel
> > > Koshy <
> > > > > > > >> > > > > > jjkosh...@gmail.com>
> > > > > > > >> > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > I'm definitely in favor of having such
> > > hooks
> > > > > in
> > > > > > > the
> > > > > > > >> > > > > > > produce/consume
> > > > > > > >> > > > > > > > > > > > life-cycle. Not sure if people
> remember
> > > this
> > > > > but
> > > > > > > in
> > > > > > > >> > Kafka
> > > > > > > >> > > > 0.7
> > > > > > > >> > > > > > > this
> > > > > > > >> > > > > > > > > was
> > > > > > > >> > > > > > > > > > > > pretty much how it was:
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > > > > > >> > > > > > > > > > > > i.e., we had something similar to the
> > > > > > interceptor
> > > > > > > >> > > proposal
> > > > > > > >> > > > > for
> > > > > > > >> > > > > > > > > various
> > > > > > > >> > > > > > > > > > > > stages of the producer request. The
> > > producer
> > > > > > > >> provided
> > > > > > > >> > > > > > call-backs
> > > > > > > >> > > > > > > > for
> > > > > > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue,
> > > afterDequeuing,
> > > > > > > >> > > beforeSending,
> > > > > > > >> > > > > > etc.
> > > > > > > >> > > > > > > So
> > > > > > > >> > > > > > > > > at
> > > > > > > >> > > > > > > > > > > > LinkedIn we in fact did auditing
> within
> > > > these
> > > > > > > >> > call-backs
> > > > > > > >> > > > (and
> > > > > > > >> > > > > > not
> > > > > > > >> > > > > > > > > > > > explicitly in the wrapper). Over time
> > and
> > > > with
> > > > > > 0.8
> > > > > > > >> we
> > > > > > > >> > > moved
> > > > > > > >> > > > > > that
> > > > > > > >> > > > > > > > out
> > > > > > > >> > > > > > > > > to
> > > > > > > >> > > > > > > > > > > the
> > > > > > > >> > > > > > > > > > > > wrapper libraries.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > On a side-note while audit and other
> > > > > monitoring
> > > > > > > can
> > > > > > > >> be
> > > > > > > >> > > done
> > > > > > > >> > > > > > > > > internally
> > > > > > > >> > > > > > > > > > > in a
> > > > > > > >> > > > > > > > > > > > convenient way I think it should be
> > > > clarified
> > > > > > that
> > > > > > > >> > > having a
> > > > > > > >> > > > > > > wrapper
> > > > > > > >> > > > > > > > > is
> > > > > > > >> > > > > > > > > > in
> > > > > > > >> > > > > > > > > > > > general not a bad idea and I would
> even
> > > > > consider
> > > > > > > it
> > > > > > > >> to
> > > > > > > >> > > be a
> > > > > > > >> > > > > > > > > > > best-practice.
> > > > > > > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper
> > > library
> > > > > and
> > > > > > > >> that
> > > > > > > >> > API
> > > > > > > >> > > > has
> > > > > > > >> > > > > > > > largely
> > > > > > > >> > > > > > > > > > > > stayed the same and has helped protect
> > > > against
> > > > > > > >> > (sometimes
> > > > > > > >> > > > > > > backwards
> > > > > > > >> > > > > > > > > > > > incompatible) changes in open source.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > While we are on this topic I have one
> > > > comment
> > > > > > and
> > > > > > > >> Anna,
> > > > > > > >> > > you
> > > > > > > >> > > > > may
> > > > > > > >> > > > > > > > have
> > > > > > > >> > > > > > > > > > > > already considered this but I don't
> see
> > > > > mention
> > > > > > of
> > > > > > > >> it
> > > > > > > >> > in
> > > > > > > >> > > > the
> > > > > > > >> > > > > > KIP:
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > Add a custom message
> > interceptor/validator
> > > > on
> > > > > > the
> > > > > > > >> > broker
> > > > > > > >> > > on
> > > > > > > >> > > > > > > message
> > > > > > > >> > > > > > > > > > > > arrival.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > We decompress and do basic validation
> of
> > > > > > messages
> > > > > > > on
> > > > > > > >> > > > > arrival. I
> > > > > > > >> > > > > > > > think
> > > > > > > >> > > > > > > > > > > there
> > > > > > > >> > > > > > > > > > > > is value in supporting custom
> validation
> > > and
> > > > > > > expand
> > > > > > > >> it
> > > > > > > >> > to
> > > > > > > >> > > > > > support
> > > > > > > >> > > > > > > > > > custom
> > > > > > > >> > > > > > > > > > > > on-arrival processing. Here is a
> > specific
> > > > > > > use-case I
> > > > > > > >> > have
> > > > > > > >> > > > in
> > > > > > > >> > > > > > > mind.
> > > > > > > >> > > > > > > > > The
> > > > > > > >> > > > > > > > > > > blog
> > > > > > > >> > > > > > > > > > > > that James referenced describes our
> > > auditing
> > > > > > > >> > > > infrastructure.
> > > > > > > >> > > > > In
> > > > > > > >> > > > > > > > order
> > > > > > > >> > > > > > > > > > to
> > > > > > > >> > > > > > > > > > > > audit the Kafka cluster itself we need
> > to
> > > > run
> > > > > a
> > > > > > > >> > "console
> > > > > > > >> > > > > > auditor"
> > > > > > > >> > > > > > > > > > service
> > > > > > > >> > > > > > > > > > > > that consumes everything and spits out
> > > audit
> > > > > > > events
> > > > > > > >> > back
> > > > > > > >> > > to
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > > > cluster.
> > > > > > > >> > > > > > > > > > > I
> > > > > > > >> > > > > > > > > > > > would prefer not having to run this
> > > service
> > > > > > > because:
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > >    - Well, it is one more service that
> > we
> > > > have
> > > > > > to
> > > > > > > >> run
> > > > > > > >> > and
> > > > > > > >> > > > > > monitor
> > > > > > > >> > > > > > > > > > > >    - Consuming everything takes up
> > > bandwidth
> > > > > > which
> > > > > > > >> can
> > > > > > > >> > be
> > > > > > > >> > > > > > avoided
> > > > > > > >> > > > > > > > > > > >    - The console auditor consumer
> itself
> > > can
> > > > > lag
> > > > > > > and
> > > > > > > >> > > cause
> > > > > > > >> > > > > > > > temporary
> > > > > > > >> > > > > > > > > > > audit
> > > > > > > >> > > > > > > > > > > >    discrepancies
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > One way we can mitigate this is by
> > having
> > > > > > > >> mirror-makers
> > > > > > > >> > > in
> > > > > > > >> > > > > > > between
> > > > > > > >> > > > > > > > > > > clusters
> > > > > > > >> > > > > > > > > > > > emit audit events. The problem is that
> > the
> > > > > very
> > > > > > > last
> > > > > > > >> > > > cluster
> > > > > > > >> > > > > in
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > > > > > > pipeline will not have any audit which
> > is
> > > > why
> > > > > we
> > > > > > > >> need
> > > > > > > >> > to
> > > > > > > >> > > > have
> > > > > > > >> > > > > > > > > something
> > > > > > > >> > > > > > > > > > > to
> > > > > > > >> > > > > > > > > > > > audit the cluster.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > If we had a custom message validator
> > then
> > > > the
> > > > > > > audit
> > > > > > > >> can
> > > > > > > >> > > be
> > > > > > > >> > > > > done
> > > > > > > >> > > > > > > > > > > on-arrival
> > > > > > > >> > > > > > > > > > > > and we won't need a console auditor.
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > One potential issue in this approach
> and
> > > any
> > > > > > > >> elaborate
> > > > > > > >> > > > > > on-arrival
> > > > > > > >> > > > > > > > > > > > processing for that matter is that you
> > may
> > > > > need
> > > > > > to
> > > > > > > >> > > > > deserialize
> > > > > > > >> > > > > > > the
> > > > > > > >> > > > > > > > > > > message
> > > > > > > >> > > > > > > > > > > > as well which can drive up produce
> > request
> > > > > > > handling
> > > > > > > >> > > times.
> > > > > > > >> > > > > > > However
> > > > > > > >> > > > > > > > > I'm
> > > > > > > >> > > > > > > > > > > not
> > > > > > > >> > > > > > > > > > > > terribly concerned about that
> especially
> > > if
> > > > > the
> > > > > > > >> audit
> > > > > > > >> > > > header
> > > > > > > >> > > > > > can
> > > > > > > >> > > > > > > be
> > > > > > > >> > > > > > > > > > > > separated out easily or even
> > deserialized
> > > > > > > partially
> > > > > > > >> as
> > > > > > > >> > > this
> > > > > > > >> > > > > > Avro
> > > > > > > >> > > > > > > > > thread
> > > > > > > >> > > > > > > > > > > > touches on
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > Joel
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM,
> > Mayuresh
> > > > > > Gharat
> > > > > > > <
> > > > > > > >> > > > > > > > > > > > gharatmayures...@gmail.com> wrote:
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea.
> > > > > > > >> > > > > > > > > > > > > Was just thinking if we can add
> > > onDequed()
> > > > > to
> > > > > > > the
> > > > > > > >> > > > > > > > > ProducerIterceptor
> > > > > > > >> > > > > > > > > > > > > interface. Since we have the
> > > onEnqueued(),
> > > > > it
> > > > > > > will
> > > > > > > >> > help
> > > > > > > >> > > > the
> > > > > > > >> > > > > > > > client
> > > > > > > >> > > > > > > > > or
> > > > > > > >> > > > > > > > > > > the
> > > > > > > >> > > > > > > > > > > > > tools to know how much time the
> > message
> > > > > spent
> > > > > > in
> > > > > > > >> the
> > > > > > > >> > > > > > > > > > RecordAccumulator.
> > > > > > > >> > > > > > > > > > > > > Also an API to check if there are
> any
> > > > > messages
> > > > > > > >> left
> > > > > > > >> > > for a
> > > > > > > >> > > > > > > > > particular
> > > > > > > >> > > > > > > > > > > > topic
> > > > > > > >> > > > > > > > > > > > > in the RecordAccumulator would help.
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > Mayuresh
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM,
> Todd
> > > > > Palino
> > > > > > <
> > > > > > > >> > > > > > > tpal...@gmail.com
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve been talking
> about
> > > this
> > > > > > for 2
> > > > > > > >> > years,
> > > > > > > >> > > > and
> > > > > > > >> > > > > > I’m
> > > > > > > >> > > > > > > > > glad
> > > > > > > >> > > > > > > > > > > > > someone
> > > > > > > >> > > > > > > > > > > > > > is finally picking it up. Will
> take
> > a
> > > > look
> > > > > > at
> > > > > > > >> the
> > > > > > > >> > KIP
> > > > > > > >> > > > at
> > > > > > > >> > > > > > some
> > > > > > > >> > > > > > > > > point
> > > > > > > >> > > > > > > > > > > > > > shortly.
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > -Todd
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM,
> > Jay
> > > > > Kreps
> > > > > > <
> > > > > > > >> > > > > > > j...@confluent.io>
> > > > > > > >> > > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > Hey Becket,
> > > > > > > >> > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > Yeah this is really similar to
> the
> > > > > > callback.
> > > > > > > >> The
> > > > > > > >> > > > > > difference
> > > > > > > >> > > > > > > > is
> > > > > > > >> > > > > > > > > > > really
> > > > > > > >> > > > > > > > > > > > > in
> > > > > > > >> > > > > > > > > > > > > > > who sets the behavior. The idea
> of
> > > the
> > > > > > > >> > interceptor
> > > > > > > >> > > is
> > > > > > > >> > > > > > that
> > > > > > > >> > > > > > > it
> > > > > > > >> > > > > > > > > > > doesn't
> > > > > > > >> > > > > > > > > > > > > > > require any code change in apps
> so
> > > you
> > > > > can
> > > > > > > >> > globally
> > > > > > > >> > > > add
> > > > > > > >> > > > > > > > > behavior
> > > > > > > >> > > > > > > > > > to
> > > > > > > >> > > > > > > > > > > > > your
> > > > > > > >> > > > > > > > > > > > > > > Kafka usage without changing app
> > > code.
> > > > > > > Whereas
> > > > > > > >> > the
> > > > > > > >> > > > > > callback
> > > > > > > >> > > > > > > > is
> > > > > > > >> > > > > > > > > > > added
> > > > > > > >> > > > > > > > > > > > by
> > > > > > > >> > > > > > > > > > > > > > the
> > > > > > > >> > > > > > > > > > > > > > > app. The idea is to kind of
> > obviate
> > > > the
> > > > > > need
> > > > > > > >> for
> > > > > > > >> > > the
> > > > > > > >> > > > > > > wrapper
> > > > > > > >> > > > > > > > > code
> > > > > > > >> > > > > > > > > > > > that
> > > > > > > >> > > > > > > > > > > > > > e.g.
> > > > > > > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this
> > kind
> > > > of
> > > > > > > stuff.
> > > > > > > >> > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > -Jay
> > > > > > > >> > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM,
> > > > Becket
> > > > > > Qin
> > > > > > > <
> > > > > > > >> > > > > > > > > > becket....@gmail.com>
> > > > > > > >> > > > > > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > This could be a useful
> feature.
> > > And
> > > > I
> > > > > > > think
> > > > > > > >> > there
> > > > > > > >> > > > are
> > > > > > > >> > > > > > > some
> > > > > > > >> > > > > > > > > use
> > > > > > > >> > > > > > > > > > > > cases
> > > > > > > >> > > > > > > > > > > > > to
> > > > > > > >> > > > > > > > > > > > > > > > mutate the data like rejected
> > > > > > alternative
> > > > > > > >> one
> > > > > > > >> > > > > > mentioned.
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > I am wondering if there is
> > > > functional
> > > > > > > >> > overlapping
> > > > > > > >> > > > > > between
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > ProducerInterceptor.onAcknowledgement()
> > > > > > > and
> > > > > > > >> the
> > > > > > > >> > > > > > producer
> > > > > > > >> > > > > > > > > > > callback?
> > > > > > > >> > > > > > > > > > > > I
> > > > > > > >> > > > > > > > > > > > > > can
> > > > > > > >> > > > > > > > > > > > > > > > see that the Callback could
> be a
> > > per
> > > > > > > record
> > > > > > > >> > > setting
> > > > > > > >> > > > > > while
> > > > > > > >> > > > > > > > > > > > > > > > onAcknowledgement() is a
> > producer
> > > > > level
> > > > > > > >> > setting.
> > > > > > > >> > > > > Other
> > > > > > > >> > > > > > > than
> > > > > > > >> > > > > > > > > > that,
> > > > > > > >> > > > > > > > > > > > is
> > > > > > > >> > > > > > > > > > > > > > > there
> > > > > > > >> > > > > > > > > > > > > > > > any difference between them?
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21
> PM,
> > > > Neha
> > > > > > > >> Narkhede
> > > > > > > >> > <
> > > > > > > >> > > > > > > > > > > n...@confluent.io>
> > > > > > > >> > > > > > > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > James,
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > That is one of the many
> > > monitoring
> > > > > use
> > > > > > > >> cases
> > > > > > > >> > > for
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > > > > > > interceptor
> > > > > > > >> > > > > > > > > > > > > > > > interface.
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > > > > > > > > Neha
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18
> > PM,
> > > > > James
> > > > > > > >> Cheng
> > > > > > > >> > <
> > > > > > > >> > > > > > > > > > jch...@tivo.com>
> > > > > > > >> > > > > > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > Anna,
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > I'm trying to understand a
> > > > > concrete
> > > > > > > use
> > > > > > > >> > case.
> > > > > > > >> > > > It
> > > > > > > >> > > > > > > sounds
> > > > > > > >> > > > > > > > > > like
> > > > > > > >> > > > > > > > > > > > > > producer
> > > > > > > >> > > > > > > > > > > > > > > > > > interceptors could be used
> > to
> > > > > > > implement
> > > > > > > >> > part
> > > > > > > >> > > of
> > > > > > > >> > > > > > > > > LinkedIn's
> > > > > > > >> > > > > > > > > > > > Kafak
> > > > > > > >> > > > > > > > > > > > > > > Audit
> > > > > > > >> > > > > > > > > > > > > > > > > > tool?
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > >
> > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > Part of that is done by a
> > > > wrapper
> > > > > > > >> library
> > > > > > > >> > > > around
> > > > > > > >> > > > > > the
> > > > > > > >> > > > > > > > > kafka
> > > > > > > >> > > > > > > > > > > > > producer
> > > > > > > >> > > > > > > > > > > > > > > > that
> > > > > > > >> > > > > > > > > > > > > > > > > > keeps a count of the
> number
> > of
> > > > > > > messages
> > > > > > > >> > > > produced,
> > > > > > > >> > > > > > and
> > > > > > > >> > > > > > > > > then
> > > > > > > >> > > > > > > > > > > > sends
> > > > > > > >> > > > > > > > > > > > > > that
> > > > > > > >> > > > > > > > > > > > > > > > > count
> > > > > > > >> > > > > > > > > > > > > > > > > > to a side-topic. It sounds
> > > like
> > > > > the
> > > > > > > >> > producer
> > > > > > > >> > > > > > > > interceptors
> > > > > > > >> > > > > > > > > > > could
> > > > > > > >> > > > > > > > > > > > > > > > possibly
> > > > > > > >> > > > > > > > > > > > > > > > > be
> > > > > > > >> > > > > > > > > > > > > > > > > > used to implement that?
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > -James
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at 4:33
> > PM,
> > > > > Anna
> > > > > > > >> > Povzner <
> > > > > > > >> > > > > > > > > > > a...@confluent.io
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > wrote:
> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > > Hi,
> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42
> > for
> > > > > adding
> > > > > > > >> > producer
> > > > > > > >> > > > and
> > > > > > > >> > > > > > > > > consumer
> > > > > > > >> > > > > > > > > > > > > > > interceptors
> > > > > > > >> > > > > > > > > > > > > > > > > for
> > > > > > > >> > > > > > > > > > > > > > > > > > > intercepting messages at
> > > > > different
> > > > > > > >> points
> > > > > > > >> > > on
> > > > > > > >> > > > > > > producer
> > > > > > > >> > > > > > > > > and
> > > > > > > >> > > > > > > > > > > > > > consumer.
> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > > Comments and suggestions
> > are
> > > > > > > welcome!
> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > > > > > > > > > > Anna
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > ________________________________
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > > This email and any
> > attachments
> > > > may
> > > > > > > >> contain
> > > > > > > >> > > > > > > confidential
> > > > > > > >> > > > > > > > > and
> > > > > > > >> > > > > > > > > > > > > > > privileged
> > > > > > > >> > > > > > > > > > > > > > > > > > material for the sole use
> of
> > > the
> > > > > > > >> intended
> > > > > > > >> > > > > > recipient.
> > > > > > > >> > > > > > > > Any
> > > > > > > >> > > > > > > > > > > > review,
> > > > > > > >> > > > > > > > > > > > > > > > copying,
> > > > > > > >> > > > > > > > > > > > > > > > > > or distribution of this
> > email
> > > > (or
> > > > > > any
> > > > > > > >> > > > > attachments)
> > > > > > > >> > > > > > by
> > > > > > > >> > > > > > > > > > others
> > > > > > > >> > > > > > > > > > > is
> > > > > > > >> > > > > > > > > > > > > > > > > prohibited.
> > > > > > > >> > > > > > > > > > > > > > > > > > If you are not the
> intended
> > > > > > recipient,
> > > > > > > >> > please
> > > > > > > >> > > > > > contact
> > > > > > > >> > > > > > > > the
> > > > > > > >> > > > > > > > > > > > sender
> > > > > > > >> > > > > > > > > > > > > > > > > > immediately and
> permanently
> > > > delete
> > > > > > > this
> > > > > > > >> > email
> > > > > > > >> > > > and
> > > > > > > >> > > > > > any
> > > > > > > >> > > > > > > > > > > > > attachments.
> > > > > > > >> > > > > > > > > > > > > > No
> > > > > > > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo
> > Inc.
> > > > is
> > > > > > > >> > authorized
> > > > > > > >> > > to
> > > > > > > >> > > > > > > > conclude
> > > > > > > >> > > > > > > > > > any
> > > > > > > >> > > > > > > > > > > > > > binding
> > > > > > > >> > > > > > > > > > > > > > > > > > agreement on behalf of
> TiVo
> > > Inc.
> > > > > by
> > > > > > > >> email.
> > > > > > > >> > > > > Binding
> > > > > > > >> > > > > > > > > > agreements
> > > > > > > >> > > > > > > > > > > > > with
> > > > > > > >> > > > > > > > > > > > > > > TiVo
> > > > > > > >> > > > > > > > > > > > > > > > > > Inc. may only be made by a
> > > > signed
> > > > > > > >> written
> > > > > > > >> > > > > > agreement.
> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > > > --
> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > > > > > > > > Neha
> > > > > > > >> > > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > --
> > > > > > > >> > > > > > > > > > > > > > *—-*
> > > > > > > >> > > > > > > > > > > > > > *Todd Palino*
> > > > > > > >> > > > > > > > > > > > > > Staff Site Reliability Engineer
> > > > > > > >> > > > > > > > > > > > > > Data Infrastructure Streaming
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino
> > > > > > > >> > > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > > > --
> > > > > > > >> > > > > > > > > > > > > -Regards,
> > > > > > > >> > > > > > > > > > > > > Mayuresh R. Gharat
> > > > > > > >> > > > > > > > > > > > > (862) 250-7125
> > > > > > > >> > > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > > >
> > > > > > > >> > > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > > > --
> > > > > > > >> > > > > > > > > > Thanks,
> > > > > > > >> > > > > > > > > > Neha
> > > > > > > >> > > > > > > > > >
> > > > > > > >> > > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > --
> > > > > > > >> > > > > > > > *—-*
> > > > > > > >> > > > > > > > *Todd Palino*
> > > > > > > >> > > > > > > > Staff Site Reliability Engineer
> > > > > > > >> > > > > > > > Data Infrastructure Streaming
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > > > linkedin.com/in/toddpalino
> > > > > > > >> > > > > > > >
> > > > > > > >> > > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > --
> > > > > > > >> > > > > > -Regards,
> > > > > > > >> > > > > > Mayuresh R. Gharat
> > > > > > > >> > > > > > (862) 250-7125
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > --
> > > > > > > >> > > > *—-*
> > > > > > > >> > > > *Todd Palino*
> > > > > > > >> > > > Staff Site Reliability Engineer
> > > > > > > >> > > > Data Infrastructure Streaming
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > linkedin.com/in/toddpalino
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > --
> > > > > > > >> > > -Regards,
> > > > > > > >> > > Mayuresh R. Gharat
> > > > > > > >> > > (862) 250-7125
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> --
> > > > > > > >> -Regards,
> > > > > > > >> Mayuresh R. Gharat
> > > > > > > >> (862) 250-7125
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> *—-*
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>

Reply via email to