On a second thought, yes, I think we should expose record size that
represents application bytes. This is Becket's option #1.

I updated the KIP wiki with new fields in RecordMetadata and ConsumerRecord.

I would like to start a voting thread tomorrow if there are no objections
or more things to discuss.

Thanks,
Anna



On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner <a...@confluent.io> wrote:

> Regarding record size as bytes sent over the wire, my concern is that it
> is almost impossible to calculate per-record. We could do as: 1) compressed
> bytes / number of records in a compressed message, as Todd mentioned; or 2)
> or same as #1 but take it proportional to uncompressed record size vs.
> total uncompressed size of records. All of these calculations give us an
> estimate. So maybe record size as bytes sent over the wire is not a
> per-record metadata, but rather per topic/partition measure that is better
> to be exposed through metrics?
>
>
> On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino <tpal...@gmail.com> wrote:
>
>> It may be difficult (or nearly impossible) to get actual compressed bytes
>> for a message from a compressed batch, but I do think it’s useful
>> information to have available for the very reason noted, bandwidth
>> consumed. Does it make sense to have an interceptor at the batch level
>> that
>> can provide this? The other option is to estimate it (such as making an
>> assumption that the messages in a batch are equal in size, which is not
>> necessarily true), which is probably not the right answer.
>>
>> -Todd
>>
>>
>> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner <a...@confluent.io> wrote:
>>
>> > Hi Becket,
>> >
>> > It will be up to the interceptor to implement their audit or monitoring
>> > strategy. I would also imagine there is more than one good way to do
>> audit.
>> > So, I agree that some of the interceptors may not use CRC, and we will
>> not
>> > require it. The question is now whether intercepting CRCs is needed. I
>> > think they are very useful for monitoring and audit, because CRC
>> provides
>> > an a easy way to get a summary of a message, rather than using message
>> > bytes or key/value objects.
>> >
>> > Regarding record size, I agree that bandwidth example was not a good
>> one. I
>> > think it would be hard to get actual bytes sent over the wire (your #2),
>> > since multiple records get compressed together and we would need to
>> decide
>> > which bytes to account to which record. So I am inclined to only do your
>> > #1. However, it still makes more sense to me just to return record size
>> > including the header, since this is the actual record size.
>> >
>> > Thanks,
>> > Anna
>> >
>> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Anna,
>> > >
>> > > Using CRC to do end2end auditing might be very costly because you will
>> > need
>> > > to collect all the CRC from both producer and consumer. And it is
>> based
>> > on
>> > > the assumption that broker does not modify the record.
>> > > Can you shed some idea on how end to end auditing will be using the
>> CRC
>> > > before we decide to expose such low level detail to the end user? It
>> > would
>> > > also be helpful if you can compare it with something like sequence
>> number
>> > > based auditing.
>> > >
>> > > About the record size, one thing worth notice is that the size of
>> Record
>> > is
>> > > not the actual bytes sent over the wire if we use compression. So that
>> > does
>> > > not really tell user how much bandwidth they are using. Personally I
>> > think
>> > > two kinds of size may be useful.
>> > > 1. The record size after serialization, i.e. application bytes. (The
>> > > uncompressed record size can be easily derived as well)
>> > > 2. The actual bytes sent over the wire.
>> > > We can get (1) easily, but (2) is difficult to get at Record level
>> when
>> > we
>> > > use compression.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <a...@confluent.io>
>> > wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > The use-case for CRC is end-to-end audit, rather than checking
>> whether
>> > a
>> > > > single message is corrupt or not.
>> > > >
>> > > > Regarding record size, I was thinking to extract record size from
>> > Record.
>> > > > That will include header overhead as well. I think total record size
>> > will
>> > > > tell users how much bandwidth their messages take. Since header is
>> > > > relatively small and constant, users also will get an idea of their
>> > > > key/value sizes.
>> > > >
>> > > > Thanks,
>> > > > Anna
>> > > >
>> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <becket....@gmail.com>
>> > > wrote:
>> > > >
>> > > > > I am +1 on #1.2 and #3.
>> > > > >
>> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there
>> > any
>> > > > > specific use case? Currently we validate messages by calling
>> > > > ensureValid()
>> > > > > to verify the checksum and throw exception if it does not match.
>> > > > >
>> > > > > Message size would be useful. We can add that to ConsumerRecord.
>> Can
>> > > you
>> > > > > clarify the message size you are referring to? Does it include the
>> > > > message
>> > > > > header overhead or not? From user's point of view, they probably
>> > don't
>> > > > care
>> > > > > about header size.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jiangjie (Becket) Qin
>> > > > >
>> > > > >
>> > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <n...@confluent.io
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Anna,
>> > > > > >
>> > > > > > Thanks for being diligent.
>> > > > > >
>> > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum
>> and
>> > > size
>> > > > > > fields to RecordMetadata and ConsumerRecord instead of exposing
>> > > > metadata
>> > > > > > piecemeal in the interceptor APIs.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Neha
>> > > > > >
>> > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <
>> a...@confluent.io>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hi All,
>> > > > > > >
>> > > > > > > The KIP wiki page is now up-to-date with the scope we have
>> agreed
>> > > on:
>> > > > > > > Producer and Consumer Interceptors with a minimal set of
>> mutable
>> > > API
>> > > > > that
>> > > > > > > are not dependent on producer and consumer internal
>> > implementation.
>> > > > > > >
>> > > > > > > I have few more API details that I would like to bring
>> attention
>> > to
>> > > > > > or/and
>> > > > > > > discuss:
>> > > > > > >
>> > > > > > > 1. Handling exceptions
>> > > > > > >
>> > > > > > > Exceptions can provide an additional level of control. For
>> > example,
>> > > > we
>> > > > > > can
>> > > > > > > filter messages on consumer side or stop messages on producer
>> if
>> > > they
>> > > > > > don’t
>> > > > > > > have the right field.
>> > > > > > >
>> > > > > > > I see two options:
>> > > > > > > 1.1. For callbacks that can mutate records (onSend and
>> > onConsume),
>> > > > > > > propagate exceptions through the original calls
>> > > (KafkaProducer.send()
>> > > > > and
>> > > > > > > KafkaConsumer.poll() respectively). For other callbacks, catch
>> > > > > exception,
>> > > > > > > log, and ignore.
>> > > > > > > 1.2. Catch exceptions from all the interceptor callbacks and
>> > > ignore.
>> > > > > > >
>> > > > > > > The issue with 1.1. is that it effectively changes
>> > > > KafkaProducer.send()
>> > > > > > and
>> > > > > > > KafkaConsumer.poll() API, since now they may throw exceptions
>> > that
>> > > > are
>> > > > > > not
>> > > > > > > documented in KafkaProducer/Consumer API. Another option is to
>> > > allow
>> > > > to
>> > > > > > > propagate some exceptions, and ignore others.
>> > > > > > >
>> > > > > > > I think our use-cases do not require propagating exceptions.
>> So,
>> > I
>> > > > > > propose
>> > > > > > > option 1.2. Unless someone has suggestion/use-cases for
>> > propagating
>> > > > > > > exceptions. Please let me know.
>> > > > > > >
>> > > > > > > 2. Intercepting record CRC and record size
>> > > > > > >
>> > > > > > > Since we decided not to add any intermediate callbacks (such
>> as
>> > > > > onEnqueue
>> > > > > > > or onReceive) to interceptors, I think it is still valuable to
>> > > > > intercept
>> > > > > > > record CRC and record size in bytes for monitoring and audit
>> > > > use-cases.
>> > > > > > >
>> > > > > > > I propose to add checksum and size fields to RecordMetadata
>> and
>> > > > > > > ConsumerRecord. Another option would be to add them as
>> parameters
>> > > in
>> > > > > > > onAcknowledgement() and onConsume() callbacks.
>> > > > > > >
>> > > > > > > 3. Callbacks that allow to modify records look as follows:
>> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
>> > > > > > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V>
>> records);
>> > > > > > >
>> > > > > > > This means that interceptors can potentially modify
>> > topic/partition
>> > > > in
>> > > > > > > ProducerRecord and topic/partition/offset in ConsumerRecord. I
>> > > > propose
>> > > > > > that
>> > > > > > > it is up to the interceptor implementation to ensure that
>> > > > > > topic/partition,
>> > > > > > > etc is correct. KafkaProducer.send() will use topic,
>> partition,
>> > > key,
>> > > > > and
>> > > > > > > value from ProducerRecord returned from the onSend().
>> Similarly,
>> > > > > > > ConsumerRecords returned from KafkaConsumer.poll() would be
>> the
>> > > ones
>> > > > > > > returned from the interceptor.
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > Please let me know if you have any suggestions or objections
>> to
>> > the
>> > > > > > above.
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Anna
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <
>> a...@confluent.io
>> > >
>> > > > > wrote:
>> > > > > > >
>> > > > > > > > Hi Mayuresh,
>> > > > > > > >
>> > > > > > > > I see why you would want to check for messages left in the
>> > > > > > > > RecordAccumulator. However, I don't think this will
>> completely
>> > > > solve
>> > > > > > the
>> > > > > > > > problem. Messages could be in-flight somewhere else, like in
>> > the
>> > > > > > socket,
>> > > > > > > or
>> > > > > > > > there maybe in-flight messages on the consumer side of the
>> > > > > MirrorMaker.
>> > > > > > > So,
>> > > > > > > > if we go the route of checking whether there are any
>> in-flight
>> > > > > messages
>> > > > > > > for
>> > > > > > > > topic deletion use-case, maybe it is better count them with
>> > > > onSend()
>> > > > > > and
>> > > > > > > > onAcknowledge() -- whether all messages sent were
>> > acknowledged. I
>> > > > > also
>> > > > > > > > think that it would be better to solve this without
>> > interceptors,
>> > > > > such
>> > > > > > as
>> > > > > > > > fix error handling in this scenario. However, I do not have
>> any
>> > > > good
>> > > > > > > > proposal right now, so these are just general thoughts.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > > Anna
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
>> > > > > > > > gharatmayures...@gmail.com> wrote:
>> > > > > > > >
>> > > > > > > >> Calling producer.flush(), flushes all the data. So this is
>> OK.
>> > > But
>> > > > > > when
>> > > > > > > >> you
>> > > > > > > >> are running Mirror maker, I am not sure there is a way to
>> > > flush()
>> > > > > from
>> > > > > > > >> outside.
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> Thanks,
>> > > > > > > >>
>> > > > > > > >> Mayuresh
>> > > > > > > >>
>> > > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <
>> > > > becket....@gmail.com>
>> > > > > > > >> wrote:
>> > > > > > > >>
>> > > > > > > >> > Mayuresh,
>> > > > > > > >> >
>> > > > > > > >> > Regarding your use case about mirror maker. Is it good
>> > enough
>> > > as
>> > > > > > long
>> > > > > > > >> as we
>> > > > > > > >> > know there is no message for the topic in the producer
>> > > anymore?
>> > > > If
>> > > > > > > that
>> > > > > > > >> is
>> > > > > > > >> > the case, call producer.flush() is sufficient.
>> > > > > > > >> >
>> > > > > > > >> > Thanks,
>> > > > > > > >> >
>> > > > > > > >> > Jiangjie (Becket) Qin
>> > > > > > > >> >
>> > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
>> > > > > > > >> > gharatmayures...@gmail.com
>> > > > > > > >> > > wrote:
>> > > > > > > >> >
>> > > > > > > >> > > Hi Anna,
>> > > > > > > >> > >
>> > > > > > > >> > > Thanks a lot for summarizing the discussion on this
>> kip.
>> > > > > > > >> > >
>> > > > > > > >> > > It LGTM.
>> > > > > > > >> > > This is really nice :
>> > > > > > > >> > > We decided not to add any callbacks to producer and
>> > consumer
>> > > > > > > >> > > interceptors that will depend on internal
>> implementation
>> > as
>> > > > part
>> > > > > > of
>> > > > > > > >> this
>> > > > > > > >> > > KIP.
>> > > > > > > >> > > *However, it is possible to add them later as part of
>> > > another
>> > > > > KIP
>> > > > > > if
>> > > > > > > >> > there
>> > > > > > > >> > > are good use-cases.*
>> > > > > > > >> > >
>> > > > > > > >> > > Do you agree with the use case I explained earlier for
>> > > knowing
>> > > > > the
>> > > > > > > >> number
>> > > > > > > >> > > of records left in the RecordAccumulator for a
>> particular
>> > > > topic.
>> > > > > > It
>> > > > > > > >> might
>> > > > > > > >> > > be orthogonal to this KIP, but will be helpful. What do
>> > you
>> > > > > think?
>> > > > > > > >> > >
>> > > > > > > >> > > Thanks,
>> > > > > > > >> > >
>> > > > > > > >> > > Mayuresh
>> > > > > > > >> > >
>> > > > > > > >> > >
>> > > > > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <
>> > > > tpal...@gmail.com
>> > > > > >
>> > > > > > > >> wrote:
>> > > > > > > >> > >
>> > > > > > > >> > > > This looks good. As noted, having one mutable
>> > interceptor
>> > > on
>> > > > > > each
>> > > > > > > >> side
>> > > > > > > >> > > > allows for the use cases we can envision right now,
>> and
>> > I
>> > > > > think
>> > > > > > > >> that’s
>> > > > > > > >> > > > going to provide a great deal of opportunity for
>> > > > implementing
>> > > > > > > things
>> > > > > > > >> > like
>> > > > > > > >> > > > audit, especially within a multi-tenant environment.
>> > > Looking
>> > > > > > > >> forward to
>> > > > > > > >> > > > getting this available in the clients.
>> > > > > > > >> > > >
>> > > > > > > >> > > > Thanks!
>> > > > > > > >> > > >
>> > > > > > > >> > > > -Todd
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <
>> > > > > > a...@confluent.io>
>> > > > > > > >> > wrote:
>> > > > > > > >> > > >
>> > > > > > > >> > > > > Hi All,
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Here is meeting notes from today’s KIP meeting:
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > 1. We agreed to keep the scope of this KIP to be
>> > > producer
>> > > > > and
>> > > > > > > >> > consumer
>> > > > > > > >> > > > > interceptors only. Broker-side interceptor will be
>> > added
>> > > > > later
>> > > > > > > as
>> > > > > > > >> a
>> > > > > > > >> > > > > separate KIP. The reasons were already mentioned in
>> > this
>> > > > > > thread,
>> > > > > > > >> but
>> > > > > > > >> > > the
>> > > > > > > >> > > > > summary is:
>> > > > > > > >> > > > >  * Broker interceptor is riskier and requires
>> careful
>> > > > > > > >> consideration
>> > > > > > > >> > > about
>> > > > > > > >> > > > > overheads, whether to intercept leaders vs.
>> > > > > leaders/replicas,
>> > > > > > > >> what to
>> > > > > > > >> > > do
>> > > > > > > >> > > > on
>> > > > > > > >> > > > > leader failover and so on.
>> > > > > > > >> > > > >  * Broker interceptors increase monitoring
>> resolution,
>> > > but
>> > > > > not
>> > > > > > > >> > > including
>> > > > > > > >> > > > it
>> > > > > > > >> > > > > in this KIP does not reduce usefulness of producer
>> and
>> > > > > > consumer
>> > > > > > > >> > > > > interceptors that enable end-to-end monitoring
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > 2. We agreed to scope ProducerInterceptor and
>> > > > > > > ConsumerInterceptor
>> > > > > > > >> > > > callbacks
>> > > > > > > >> > > > > to minimal set of mutable API that are not
>> dependent
>> > on
>> > > > > > producer
>> > > > > > > >> and
>> > > > > > > >> > > > > consumer internal implementation.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > ProducerInterceptor:
>> > > > > > > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V>
>> > > > record);*
>> > > > > > > >> > > > > *void onAcknowledgement(RecordMetadata metadata,
>> > > Exception
>> > > > > > > >> > exception);*
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > ConsumerInterceptor:
>> > > > > > > >> > > > > *ConsumerRecords<K, V>
>> onConsume(ConsumerRecords<K, V>
>> > > > > > > records);*
>> > > > > > > >> > > > > *void onCommit(Map<TopicPartition,
>> OffsetAndMetadata>
>> > > > > > offsets);*
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > We will allow interceptors to modify
>> ProducerRecord on
>> > > > > > producer
>> > > > > > > >> side,
>> > > > > > > >> > > and
>> > > > > > > >> > > > > modify ConsumerRecords on consumer side. This will
>> > > support
>> > > > > > > >> end-to-end
>> > > > > > > >> > > > > monitoring and auditing and support the ability to
>> add
>> > > > > > metadata
>> > > > > > > >> for a
>> > > > > > > >> > > > > message. This will support Todd’s Auditing and
>> Routing
>> > > > > > > use-cases.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > We did not find any use-case for modifying records
>> in
>> > > > > > > onConsume()
>> > > > > > > >> > > > callback,
>> > > > > > > >> > > > > but decided to enable modification of consumer
>> records
>> > > for
>> > > > > > > >> symmetry
>> > > > > > > >> > > with
>> > > > > > > >> > > > > onSend().
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > 3. We agreed to ensure compatibility when/if we add
>> > new
>> > > > > > methods
>> > > > > > > to
>> > > > > > > >> > > > > ProducerInterceptor and ConsumerInterceptor by
>> using
>> > > > default
>> > > > > > > >> methods
>> > > > > > > >> > > with
>> > > > > > > >> > > > > an empty implementation. Ok to assume Java 8.
>> (This is
>> > > > > > Ismael’s
>> > > > > > > >> > method
>> > > > > > > >> > > > #2).
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > 4. We decided not to add any callbacks to producer
>> and
>> > > > > > consumer
>> > > > > > > >> > > > > interceptors that will depend on internal
>> > implementation
>> > > > as
>> > > > > > part
>> > > > > > > >> of
>> > > > > > > >> > > this
>> > > > > > > >> > > > > KIP. However, it is possible to add them later as
>> part
>> > > of
>> > > > > > > another
>> > > > > > > >> KIP
>> > > > > > > >> > > if
>> > > > > > > >> > > > > there are good use-cases.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > *Reasoning.* We did not have concrete use-cases
>> that
>> > > > > justified
>> > > > > > > >> more
>> > > > > > > >> > > > methods
>> > > > > > > >> > > > > at this point. Some of the use-cases were for more
>> > > > > fine-grain
>> > > > > > > >> latency
>> > > > > > > >> > > > > collection, which could be done with Kafka Metrics.
>> > > > Another
>> > > > > > > >> use-case
>> > > > > > > >> > > was
>> > > > > > > >> > > > > encryption. However, there are several design
>> options
>> > > for
>> > > > > > > >> encryption.
>> > > > > > > >> > > One
>> > > > > > > >> > > > > is to do per-record encryption which would require
>> > > adding
>> > > > > > > >> > > > > ProducerInterceptor.onEnqueued() and
>> > > > > > > >> ConsumerInterceptor.onReceive().
>> > > > > > > >> > > One
>> > > > > > > >> > > > > could argue that in that case encryption could be
>> done
>> > > by
>> > > > > > > adding a
>> > > > > > > >> > > custom
>> > > > > > > >> > > > > serializer/deserializer. Another option is to do
>> > > > encryption
>> > > > > > > after
>> > > > > > > >> > > message
>> > > > > > > >> > > > > gets compressed, but there are issues that arise
>> > > regarding
>> > > > > > > broker
>> > > > > > > >> > doing
>> > > > > > > >> > > > > re-compression. We decided that it is better to
>> have
>> > > that
>> > > > > > > >> discussion
>> > > > > > > >> > > in a
>> > > > > > > >> > > > > separate KIP and decide that this is something we
>> want
>> > > to
>> > > > do
>> > > > > > > with
>> > > > > > > >> > > > > interceptors or by other means.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Todd, Mayuresh and others who missed the KIP
>> meeting,
>> > > > please
>> > > > > > let
>> > > > > > > >> me
>> > > > > > > >> > > know
>> > > > > > > >> > > > > your thoughts on the scope we agreed on during the
>> > > > meeting.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > I will update the KIP proposal with the current
>> > decision
>> > > > by
>> > > > > > end
>> > > > > > > of
>> > > > > > > >> > > today.
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > Thanks,
>> > > > > > > >> > > > > Anna
>> > > > > > > >> > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
>> > > > > > > >> > > > > gharatmayures...@gmail.com> wrote:
>> > > > > > > >> > > > >
>> > > > > > > >> > > > > > Hi,
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > I won't be able to make it to KIP hangout due to
>> > > > conflict.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Anna, here is the use case where knowing if there
>> > are
>> > > > > > messages
>> > > > > > > >> in
>> > > > > > > >> > the
>> > > > > > > >> > > > > > RecordAccumulator left to be sent to the kafka
>> > cluster
>> > > > > for a
>> > > > > > > >> topic
>> > > > > > > >> > is
>> > > > > > > >> > > > > > useful.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 1) Consider a pipeline :
>> > > > > > > >> > > > > > A ---> Mirror-maker -----> B
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 2) We have a topic T in cluster A mirrored to
>> > cluster
>> > > B.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > 3) Now if we delete topic T in A and immediately
>> > > proceed
>> > > > > to
>> > > > > > > >> delete
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > topic in cluster B, some of the the Mirror-maker
>> > > > machines
>> > > > > > die
>> > > > > > > >> > because
>> > > > > > > >> > > > > > atleast one of the batches in RecordAccumulator
>> for
>> > > > topic
>> > > > > T
>> > > > > > > >> fail to
>> > > > > > > >> > > be
>> > > > > > > >> > > > > > produced to cluster B. We have seen this
>> happening
>> > in
>> > > > our
>> > > > > > > >> clusters.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > If we know that there are no more messages left
>> in
>> > the
>> > > > > > > >> > > > RecordAccumulator
>> > > > > > > >> > > > > to
>> > > > > > > >> > > > > > be produced to cluster B, we can safely delete
>> the
>> > > topic
>> > > > > in
>> > > > > > > >> > cluster B
>> > > > > > > >> > > > > > without disturbing the pipeline.
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Thanks,
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > Mayuresh
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner <
>> > > > > > > >> a...@confluent.io>
>> > > > > > > >> > > > > wrote:
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > > Thanks Ismael and Todd for your feedback!
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > I agree about coming up with lean, but useful
>> > > > interfaces
>> > > > > > > that
>> > > > > > > >> > will
>> > > > > > > >> > > be
>> > > > > > > >> > > > > > easy
>> > > > > > > >> > > > > > > to extend later.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > When we discuss the minimal set of producer and
>> > > > consumer
>> > > > > > > >> > > interceptor
>> > > > > > > >> > > > > API
>> > > > > > > >> > > > > > in
>> > > > > > > >> > > > > > > today’s KIP meeting (discussion item #2 in my
>> > > previous
>> > > > > > > email),
>> > > > > > > >> > lets
>> > > > > > > >> > > > > > compare
>> > > > > > > >> > > > > > > two options:
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > *1. Minimal set of immutable API for producer
>> and
>> > > > > consumer
>> > > > > > > >> > > > > interceptors*
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > ProducerInterceptor:
>> > > > > > > >> > > > > > > public void onSend(ProducerRecord<K, V>
>> record);
>> > > > > > > >> > > > > > > public void onAcknowledgement(RecordMetadata
>> > > metadata,
>> > > > > > > >> Exception
>> > > > > > > >> > > > > > > exception);
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > ConsumerInterceptor:
>> > > > > > > >> > > > > > > public void onConsume(ConsumerRecords<K, V>
>> > > records);
>> > > > > > > >> > > > > > > public void onCommit(Map<TopicPartition,
>> > > > > > OffsetAndMetadata>
>> > > > > > > >> > > offsets);
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Use-cases:
>> > > > > > > >> > > > > > > — end-to-end monitoring; custom tracing and
>> > logging
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > *2. Minimal set of mutable API for producer and
>> > > > consumer
>> > > > > > > >> > > > interceptors*
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > ProducerInterceptor:
>> > > > > > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K,
>> V>
>> > > > > record);
>> > > > > > > >> > > > > > > void onAcknowledgement(RecordMetadata metadata,
>> > > > > Exception
>> > > > > > > >> > > exception);
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > ConsumerInterceptor:
>> > > > > > > >> > > > > > > void onConsume(ConsumerRecords<K, V> records);
>> > > > > > > >> > > > > > > void onCommit(Map<TopicPartition,
>> > OffsetAndMetadata>
>> > > > > > > offsets);
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Additional use-cases to #1:
>> > > > > > > >> > > > > > > — Ability to add metadata to a message or fill
>> in
>> > > > > standard
>> > > > > > > >> fields
>> > > > > > > >> > > for
>> > > > > > > >> > > > > > audit
>> > > > > > > >> > > > > > > and routing.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Implications
>> > > > > > > >> > > > > > > — Partition assignment will be done based on
>> > > modified
>> > > > > > > >> key/value
>> > > > > > > >> > > > instead
>> > > > > > > >> > > > > > of
>> > > > > > > >> > > > > > > original key/value. If key/value
>> transformation is
>> > > not
>> > > > > > > >> consistent
>> > > > > > > >> > > > (same
>> > > > > > > >> > > > > > key
>> > > > > > > >> > > > > > > and value does not mutate to the same, but
>> > modified,
>> > > > > > > >> key/value),
>> > > > > > > >> > > then
>> > > > > > > >> > > > > log
>> > > > > > > >> > > > > > > compaction would not work. However, audit and
>> > > routing
>> > > > > > > >> use-cases
>> > > > > > > >> > > from
>> > > > > > > >> > > > > Todd
>> > > > > > > >> > > > > > > will likely do consistent transformation.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > *Additional callbacks (discussion item #3 in my
>> > > > previous
>> > > > > > > >> email):*
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > If we want to support encryption, we would
>> want to
>> > > be
>> > > > > able
>> > > > > > > to
>> > > > > > > >> > > modify
>> > > > > > > >> > > > > > > serialized key/value, rather than key and value
>> > > > objects.
>> > > > > > > This
>> > > > > > > >> > will
>> > > > > > > >> > > > add
>> > > > > > > >> > > > > > the
>> > > > > > > >> > > > > > > following API to producer and consumer
>> > interceptors:
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > ProducerInterceptor:
>> > > > > > > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition
>> tp,
>> > > > > > > >> > ProducerRecord<K,
>> > > > > > > >> > > V>
>> > > > > > > >> > > > > > > record, SerializedKeyValue serializedKeyValue);
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > ConsumerInterceptor:
>> > > > > > > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp,
>> > > > > > > >> > SerializedKeyValue
>> > > > > > > >> > > > > > > serializedKeyValue);
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > I am leaning towards implementing the minimal
>> set
>> > of
>> > > > > > > >> immutable or
>> > > > > > > >> > > > > mutable
>> > > > > > > >> > > > > > > interfaces, making sure that we have a
>> > compatibility
>> > > > > plan
>> > > > > > > that
>> > > > > > > >> > > allows
>> > > > > > > >> > > > > us
>> > > > > > > >> > > > > > to
>> > > > > > > >> > > > > > > add more callbacks in the future (per Ismael
>> > > comment),
>> > > > > and
>> > > > > > > add
>> > > > > > > >> > more
>> > > > > > > >> > > > > APIs
>> > > > > > > >> > > > > > > later. E.g., for encryption use-case, there
>> could
>> > be
>> > > > an
>> > > > > > > >> argument
>> > > > > > > >> > in
>> > > > > > > >> > > > > doing
>> > > > > > > >> > > > > > > encryption after message compression vs.
>> > per-record
>> > > > > > > encryption
>> > > > > > > >> > that
>> > > > > > > >> > > > > could
>> > > > > > > >> > > > > > > be done using the above additional API. There
>> is
>> > > also
>> > > > > more
>> > > > > > > >> > > > implications
>> > > > > > > >> > > > > > for
>> > > > > > > >> > > > > > > every API that modifies records: modifying
>> > > serialized
>> > > > > > > >> key/value
>> > > > > > > >> > > will
>> > > > > > > >> > > > > > again
>> > > > > > > >> > > > > > > impact partition assignment (we will likely do
>> > that
>> > > > > after
>> > > > > > > >> > partition
>> > > > > > > >> > > > > > > assignment), which may impact log compaction
>> and
>> > > > mirror
>> > > > > > > maker
>> > > > > > > >> > > > > > partitioning.
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > Thanks,
>> > > > > > > >> > > > > > > Anna
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino <
>> > > > > > > >> tpal...@gmail.com>
>> > > > > > > >> > > > > wrote:
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > > > > Finally got a chance to take a look at this.
>> I
>> > > won’t
>> > > > > be
>> > > > > > > >> able to
>> > > > > > > >> > > > make
>> > > > > > > >> > > > > > the
>> > > > > > > >> > > > > > > > KIP meeting due to a conflict.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > I’m somewhat disappointed in this proposal. I
>> > > think
>> > > > > that
>> > > > > > > the
>> > > > > > > >> > > > explicit
>> > > > > > > >> > > > > > > > exclusion of modification of the messages is
>> > > > > > > short-sighted,
>> > > > > > > >> and
>> > > > > > > >> > > not
>> > > > > > > >> > > > > > > > accounting for it now is going to bite us
>> later.
>> > > > Jay,
>> > > > > > > aren’t
>> > > > > > > >> > you
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > one
>> > > > > > > >> > > > > > > > railing against public interfaces and how
>> > > difficult
>> > > > > they
>> > > > > > > >> are to
>> > > > > > > >> > > > work
>> > > > > > > >> > > > > > with
>> > > > > > > >> > > > > > > > when you don’t get them right? The “simple”
>> > change
>> > > > to
>> > > > > > one
>> > > > > > > of
>> > > > > > > >> > > these
>> > > > > > > >> > > > > > > > interfaces to make it able to return a
>> record is
>> > > > going
>> > > > > > to
>> > > > > > > >> be a
>> > > > > > > >> > > > > > > significant
>> > > > > > > >> > > > > > > > change and is going to require all clients to
>> > > > rewrite
>> > > > > > > their
>> > > > > > > >> > > > > > interceptors.
>> > > > > > > >> > > > > > > > If we’re not willing to put the time to think
>> > > > through
>> > > > > > > >> > > manipulation
>> > > > > > > >> > > > > now,
>> > > > > > > >> > > > > > > > then this KIP should be shelved until we are.
>> > > > > > Implementing
>> > > > > > > >> > > > something
>> > > > > > > >> > > > > > > > halfway is going to be worse than taking a
>> > little
>> > > > > > longer.
>> > > > > > > In
>> > > > > > > >> > > > > addition,
>> > > > > > > >> > > > > > I
>> > > > > > > >> > > > > > > > don’t believe that manipulation requires
>> > anything
>> > > > more
>> > > > > > > than
>> > > > > > > >> > > > > > interceptors
>> > > > > > > >> > > > > > > to
>> > > > > > > >> > > > > > > > receive the full record, and then to return
>> it.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > There are 3 use case I can think of right now
>> > > > without
>> > > > > > any
>> > > > > > > >> deep
>> > > > > > > >> > > > > > discussion
>> > > > > > > >> > > > > > > > that can make use of interceptors with
>> > > modification:
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > 1. Auditing. The ability to add metadata to a
>> > > > message
>> > > > > > for
>> > > > > > > >> > > auditing
>> > > > > > > >> > > > is
>> > > > > > > >> > > > > > > > critical. Hostname, service name, timestamps,
>> > etc.
>> > > > are
>> > > > > > all
>> > > > > > > >> > pieces
>> > > > > > > >> > > > of
>> > > > > > > >> > > > > > data
>> > > > > > > >> > > > > > > > that can be used on the other side of the
>> > pipeline
>> > > > to
>> > > > > > > >> > categorize
>> > > > > > > >> > > > > > > messages,
>> > > > > > > >> > > > > > > > determine loss and transport time, and pin
>> down
>> > > > > issues.
>> > > > > > > You
>> > > > > > > >> may
>> > > > > > > >> > > say
>> > > > > > > >> > > > > > that
>> > > > > > > >> > > > > > > > these things can just be part of the message
>> > > schema,
>> > > > > but
>> > > > > > > >> anyone
>> > > > > > > >> > > who
>> > > > > > > >> > > > > has
>> > > > > > > >> > > > > > > > worked with a multi-user data system
>> (especially
>> > > > those
>> > > > > > who
>> > > > > > > >> have
>> > > > > > > >> > > > been
>> > > > > > > >> > > > > > > > involved with LinkedIn) know how difficult
>> it is
>> > > to
>> > > > > > > maintain
>> > > > > > > >> > > > > consistent
>> > > > > > > >> > > > > > > > message schemas and to get other people to
>> put
>> > in
>> > > > > fields
>> > > > > > > for
>> > > > > > > >> > your
>> > > > > > > >> > > > > use.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > 2. Encryption. This is probably the most
>> obvious
>> > > > case
>> > > > > > for
>> > > > > > > >> > record
>> > > > > > > >> > > > > > > > manipulation on both sides. The ability to
>> tie
>> > in
>> > > > end
>> > > > > to
>> > > > > > > end
>> > > > > > > >> > > > > encryption
>> > > > > > > >> > > > > > > is
>> > > > > > > >> > > > > > > > important for data that requires external
>> > > compliance
>> > > > > > (PCI,
>> > > > > > > >> > HIPAA,
>> > > > > > > >> > > > > > etc.).
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > 3. Routing. By being able to add a bit of
>> > > > information
>> > > > > > > about
>> > > > > > > >> the
>> > > > > > > >> > > > > source
>> > > > > > > >> > > > > > or
>> > > > > > > >> > > > > > > > destination of a message to the metadata, you
>> > can
>> > > > > easily
>> > > > > > > >> > > construct
>> > > > > > > >> > > > an
>> > > > > > > >> > > > > > > > intelligent mirror maker that can prevent
>> loops.
>> > > > This
>> > > > > > has
>> > > > > > > >> the
>> > > > > > > >> > > > > > opportunity
>> > > > > > > >> > > > > > > > to result in significant operational
>> savings, as
>> > > you
>> > > > > can
>> > > > > > > get
>> > > > > > > >> > rid
>> > > > > > > >> > > of
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > need for tiered clusters in order to prevent
>> > loops
>> > > > in
>> > > > > > > >> mirroring
>> > > > > > > >> > > > > > messages.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > All three of these share the feature that
>> they
>> > add
>> > > > > > > metadata
>> > > > > > > >> to
>> > > > > > > >> > > > > > messages.
>> > > > > > > >> > > > > > > > With the pushback on having arbitrary
>> metadata
>> > as
>> > > an
>> > > > > > > >> “envelope”
>> > > > > > > >> > > to
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > message, this is a way to provide it and
>> make it
>> > > the
>> > > > > > > >> > > responsibility
>> > > > > > > >> > > > > of
>> > > > > > > >> > > > > > > the
>> > > > > > > >> > > > > > > > client, and not the Kafka broker and system
>> > > itself.
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > -Todd
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma
>> <
>> > > > > > > >> > ism...@juma.me.uk>
>> > > > > > > >> > > > > > wrote:
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > > Hi Anna and Neha,
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > I think it makes a lot of sense to try and
>> > keep
>> > > > the
>> > > > > > > >> interface
>> > > > > > > >> > > > lean
>> > > > > > > >> > > > > > and
>> > > > > > > >> > > > > > > to
>> > > > > > > >> > > > > > > > > add more methods later when/if there is a
>> > need.
>> > > > What
>> > > > > > is
>> > > > > > > >> the
>> > > > > > > >> > > > current
>> > > > > > > >> > > > > > > > > thinking with regards to compatibility
>> when/if
>> > > we
>> > > > > add
>> > > > > > > new
>> > > > > > > >> > > > methods?
>> > > > > > > >> > > > > A
>> > > > > > > >> > > > > > > few
>> > > > > > > >> > > > > > > > > options come to mind:
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > 1. Change the interface to an abstract
>> class
>> > > with
>> > > > > > empty
>> > > > > > > >> > > > > > implementations
>> > > > > > > >> > > > > > > > for
>> > > > > > > >> > > > > > > > > all the methods. This means that the path
>> to
>> > > > adding
>> > > > > > new
>> > > > > > > >> > methods
>> > > > > > > >> > > > is
>> > > > > > > >> > > > > > > clear.
>> > > > > > > >> > > > > > > > > 2. Hope we have moved to Java 8 by the
>> time we
>> > > > need
>> > > > > to
>> > > > > > > add
>> > > > > > > >> > new
>> > > > > > > >> > > > > > methods
>> > > > > > > >> > > > > > > > and
>> > > > > > > >> > > > > > > > > use default methods with an empty
>> > implementation
>> > > > for
>> > > > > > any
>> > > > > > > >> new
>> > > > > > > >> > > > method
>> > > > > > > >> > > > > > > (and
>> > > > > > > >> > > > > > > > > potentially make existing methods default
>> > > methods
>> > > > > too
>> > > > > > at
>> > > > > > > >> that
>> > > > > > > >> > > > point
>> > > > > > > >> > > > > > for
>> > > > > > > >> > > > > > > > > consistency)
>> > > > > > > >> > > > > > > > > 3. Introduce a new interface that inherits
>> > from
>> > > > the
>> > > > > > > >> existing
>> > > > > > > >> > > > > > > Interceptor
>> > > > > > > >> > > > > > > > > interface when we need to add new methods.
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > Option 1 is the easiest and it also means
>> that
>> > > > > > > interceptor
>> > > > > > > >> > > users
>> > > > > > > >> > > > > only
>> > > > > > > >> > > > > > > > need
>> > > > > > > >> > > > > > > > > to override the methods that they are
>> > interested
>> > > > > (more
>> > > > > > > >> useful
>> > > > > > > >> > > if
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > number
>> > > > > > > >> > > > > > > > > of methods grows). The downside is that
>> > > > interceptor
>> > > > > > > >> > > > implementations
>> > > > > > > >> > > > > > > > cannot
>> > > > > > > >> > > > > > > > > inherit from another class (a
>> straightforward
>> > > > > > workaround
>> > > > > > > >> is
>> > > > > > > >> > to
>> > > > > > > >> > > > make
>> > > > > > > >> > > > > > the
>> > > > > > > >> > > > > > > > > interceptor a forwarder that calls another
>> > > class).
>> > > > > > Also,
>> > > > > > > >> our
>> > > > > > > >> > > > > existing
>> > > > > > > >> > > > > > > > > callbacks are interfaces, so seems a bit
>> > > > > inconsistent.
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > Option 2 may be the most appealing one as
>> both
>> > > > users
>> > > > > > and
>> > > > > > > >> > > > ourselves
>> > > > > > > >> > > > > > > retain
>> > > > > > > >> > > > > > > > > flexibility. The main downside is that it
>> > relies
>> > > > on
>> > > > > us
>> > > > > > > >> moving
>> > > > > > > >> > > to
>> > > > > > > >> > > > > Java
>> > > > > > > >> > > > > > > 8,
>> > > > > > > >> > > > > > > > > which may be more than a year away
>> potentially
>> > > (if
>> > > > > we
>> > > > > > > >> support
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > last
>> > > > > > > >> > > > > > > 2
>> > > > > > > >> > > > > > > > > Java releases).
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > Thoughts?
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > Ismael
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha
>> > Narkhede <
>> > > > > > > >> > > > n...@confluent.io>
>> > > > > > > >> > > > > > > > wrote:
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > > Anna,
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > I'm also in favor of including just the
>> APIs
>> > > for
>> > > > > > which
>> > > > > > > >> we
>> > > > > > > >> > > have
>> > > > > > > >> > > > a
>> > > > > > > >> > > > > > > clear
>> > > > > > > >> > > > > > > > > use
>> > > > > > > >> > > > > > > > > > case. If more use cases for finer
>> monitoring
>> > > > show
>> > > > > up
>> > > > > > > in
>> > > > > > > >> the
>> > > > > > > >> > > > > future,
>> > > > > > > >> > > > > > > we
>> > > > > > > >> > > > > > > > > can
>> > > > > > > >> > > > > > > > > > always update the interface. Would you
>> > please
>> > > > > > > highlight
>> > > > > > > >> in
>> > > > > > > >> > > the
>> > > > > > > >> > > > > KIP
>> > > > > > > >> > > > > > > the
>> > > > > > > >> > > > > > > > > APIs
>> > > > > > > >> > > > > > > > > > that you think we have an immediate use
>> for?
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > Joel,
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > Broker-side monitoring makes a lot of
>> sense
>> > in
>> > > > the
>> > > > > > > long
>> > > > > > > >> > term
>> > > > > > > >> > > > > > though I
>> > > > > > > >> > > > > > > > > don't
>> > > > > > > >> > > > > > > > > > think it is a requirement for end-to-end
>> > > > > monitoring.
>> > > > > > > >> With
>> > > > > > > >> > the
>> > > > > > > >> > > > > > > producer
>> > > > > > > >> > > > > > > > > and
>> > > > > > > >> > > > > > > > > > consumer interceptors, you have the
>> ability
>> > to
>> > > > get
>> > > > > > > full
>> > > > > > > >> > > > > > > > > > publish-to-subscribe end-to-end
>> monitoring.
>> > > The
>> > > > > > broker
>> > > > > > > >> > > > > interceptor
>> > > > > > > >> > > > > > > > > > certainly improves the resolution of
>> > > monitoring
>> > > > > but
>> > > > > > it
>> > > > > > > >> is
>> > > > > > > >> > > also
>> > > > > > > >> > > > a
>> > > > > > > >> > > > > > > > riskier
>> > > > > > > >> > > > > > > > > > change. I prefer an incremental approach
>> > over
>> > > a
>> > > > > > > big-bang
>> > > > > > > >> > and
>> > > > > > > >> > > > > > > recommend
>> > > > > > > >> > > > > > > > > > taking baby-steps. Let's first make sure
>> the
>> > > > > > > >> > > producer/consumer
>> > > > > > > >> > > > > > > > > interceptors
>> > > > > > > >> > > > > > > > > > are successful. And then come back and
>> add
>> > the
>> > > > > > broker
>> > > > > > > >> > > > interceptor
>> > > > > > > >> > > > > > > > > > carefully.
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > Having said that, it would be great to
>> > > > understand
>> > > > > > your
>> > > > > > > >> > > proposal
>> > > > > > > >> > > > > for
>> > > > > > > >> > > > > > > the
>> > > > > > > >> > > > > > > > > > broker interceptor independently. We can
>> > > either
>> > > > > add
>> > > > > > an
>> > > > > > > >> > > > > interceptor
>> > > > > > > >> > > > > > > > > > on-append or on-commit. If people want to
>> > use
>> > > > this
>> > > > > > for
>> > > > > > > >> > > > > monitoring,
>> > > > > > > >> > > > > > > then
>> > > > > > > >> > > > > > > > > > possibly on-commit might be more useful?
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > Neha
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay
>> Kreps <
>> > > > > > > >> > j...@confluent.io
>> > > > > > > >> > > >
>> > > > > > > >> > > > > > wrote:
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > Hey Joel,
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > What is the interface you are thinking
>> of?
>> > > > > > Something
>> > > > > > > >> like
>> > > > > > > >> > > > this:
>> > > > > > > >> > > > > > > > > > >     onAppend(String topic, int
>> partition,
>> > > > > Records
>> > > > > > > >> > records,
>> > > > > > > >> > > > long
>> > > > > > > >> > > > > > > time)
>> > > > > > > >> > > > > > > > > > > ?
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > One challenge right now is that we are
>> > still
>> > > > > using
>> > > > > > > the
>> > > > > > > >> > old
>> > > > > > > >> > > > > > > > > > > Message/MessageSet classes on the
>> broker
>> > > which
>> > > > > I'm
>> > > > > > > not
>> > > > > > > >> > sure
>> > > > > > > >> > > > if
>> > > > > > > >> > > > > > we'd
>> > > > > > > >> > > > > > > > > want
>> > > > > > > >> > > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > support over the long haul but it
>> might be
>> > > > okay
>> > > > > > just
>> > > > > > > >> to
>> > > > > > > >> > > > create
>> > > > > > > >> > > > > > the
>> > > > > > > >> > > > > > > > > > records
>> > > > > > > >> > > > > > > > > > > instance for this interface.
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > -Jay
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel
>> > > Koshy <
>> > > > > > > >> > > > > > jjkosh...@gmail.com>
>> > > > > > > >> > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > I'm definitely in favor of having
>> such
>> > > hooks
>> > > > > in
>> > > > > > > the
>> > > > > > > >> > > > > > > produce/consume
>> > > > > > > >> > > > > > > > > > > > life-cycle. Not sure if people
>> remember
>> > > this
>> > > > > but
>> > > > > > > in
>> > > > > > > >> > Kafka
>> > > > > > > >> > > > 0.7
>> > > > > > > >> > > > > > > this
>> > > > > > > >> > > > > > > > > was
>> > > > > > > >> > > > > > > > > > > > pretty much how it was:
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
>> > > > > > > >> > > > > > > > > > > > i.e., we had something similar to the
>> > > > > > interceptor
>> > > > > > > >> > > proposal
>> > > > > > > >> > > > > for
>> > > > > > > >> > > > > > > > > various
>> > > > > > > >> > > > > > > > > > > > stages of the producer request. The
>> > > producer
>> > > > > > > >> provided
>> > > > > > > >> > > > > > call-backs
>> > > > > > > >> > > > > > > > for
>> > > > > > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue,
>> > > afterDequeuing,
>> > > > > > > >> > > beforeSending,
>> > > > > > > >> > > > > > etc.
>> > > > > > > >> > > > > > > So
>> > > > > > > >> > > > > > > > > at
>> > > > > > > >> > > > > > > > > > > > LinkedIn we in fact did auditing
>> within
>> > > > these
>> > > > > > > >> > call-backs
>> > > > > > > >> > > > (and
>> > > > > > > >> > > > > > not
>> > > > > > > >> > > > > > > > > > > > explicitly in the wrapper). Over time
>> > and
>> > > > with
>> > > > > > 0.8
>> > > > > > > >> we
>> > > > > > > >> > > moved
>> > > > > > > >> > > > > > that
>> > > > > > > >> > > > > > > > out
>> > > > > > > >> > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > the
>> > > > > > > >> > > > > > > > > > > > wrapper libraries.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > On a side-note while audit and other
>> > > > > monitoring
>> > > > > > > can
>> > > > > > > >> be
>> > > > > > > >> > > done
>> > > > > > > >> > > > > > > > > internally
>> > > > > > > >> > > > > > > > > > > in a
>> > > > > > > >> > > > > > > > > > > > convenient way I think it should be
>> > > > clarified
>> > > > > > that
>> > > > > > > >> > > having a
>> > > > > > > >> > > > > > > wrapper
>> > > > > > > >> > > > > > > > > is
>> > > > > > > >> > > > > > > > > > in
>> > > > > > > >> > > > > > > > > > > > general not a bad idea and I would
>> even
>> > > > > consider
>> > > > > > > it
>> > > > > > > >> to
>> > > > > > > >> > > be a
>> > > > > > > >> > > > > > > > > > > best-practice.
>> > > > > > > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper
>> > > library
>> > > > > and
>> > > > > > > >> that
>> > > > > > > >> > API
>> > > > > > > >> > > > has
>> > > > > > > >> > > > > > > > largely
>> > > > > > > >> > > > > > > > > > > > stayed the same and has helped
>> protect
>> > > > against
>> > > > > > > >> > (sometimes
>> > > > > > > >> > > > > > > backwards
>> > > > > > > >> > > > > > > > > > > > incompatible) changes in open source.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > While we are on this topic I have one
>> > > > comment
>> > > > > > and
>> > > > > > > >> Anna,
>> > > > > > > >> > > you
>> > > > > > > >> > > > > may
>> > > > > > > >> > > > > > > > have
>> > > > > > > >> > > > > > > > > > > > already considered this but I don't
>> see
>> > > > > mention
>> > > > > > of
>> > > > > > > >> it
>> > > > > > > >> > in
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > KIP:
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Add a custom message
>> > interceptor/validator
>> > > > on
>> > > > > > the
>> > > > > > > >> > broker
>> > > > > > > >> > > on
>> > > > > > > >> > > > > > > message
>> > > > > > > >> > > > > > > > > > > > arrival.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > We decompress and do basic
>> validation of
>> > > > > > messages
>> > > > > > > on
>> > > > > > > >> > > > > arrival. I
>> > > > > > > >> > > > > > > > think
>> > > > > > > >> > > > > > > > > > > there
>> > > > > > > >> > > > > > > > > > > > is value in supporting custom
>> validation
>> > > and
>> > > > > > > expand
>> > > > > > > >> it
>> > > > > > > >> > to
>> > > > > > > >> > > > > > support
>> > > > > > > >> > > > > > > > > > custom
>> > > > > > > >> > > > > > > > > > > > on-arrival processing. Here is a
>> > specific
>> > > > > > > use-case I
>> > > > > > > >> > have
>> > > > > > > >> > > > in
>> > > > > > > >> > > > > > > mind.
>> > > > > > > >> > > > > > > > > The
>> > > > > > > >> > > > > > > > > > > blog
>> > > > > > > >> > > > > > > > > > > > that James referenced describes our
>> > > auditing
>> > > > > > > >> > > > infrastructure.
>> > > > > > > >> > > > > In
>> > > > > > > >> > > > > > > > order
>> > > > > > > >> > > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > > audit the Kafka cluster itself we
>> need
>> > to
>> > > > run
>> > > > > a
>> > > > > > > >> > "console
>> > > > > > > >> > > > > > auditor"
>> > > > > > > >> > > > > > > > > > service
>> > > > > > > >> > > > > > > > > > > > that consumes everything and spits
>> out
>> > > audit
>> > > > > > > events
>> > > > > > > >> > back
>> > > > > > > >> > > to
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > > > cluster.
>> > > > > > > >> > > > > > > > > > > I
>> > > > > > > >> > > > > > > > > > > > would prefer not having to run this
>> > > service
>> > > > > > > because:
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > >    - Well, it is one more service
>> that
>> > we
>> > > > have
>> > > > > > to
>> > > > > > > >> run
>> > > > > > > >> > and
>> > > > > > > >> > > > > > monitor
>> > > > > > > >> > > > > > > > > > > >    - Consuming everything takes up
>> > > bandwidth
>> > > > > > which
>> > > > > > > >> can
>> > > > > > > >> > be
>> > > > > > > >> > > > > > avoided
>> > > > > > > >> > > > > > > > > > > >    - The console auditor consumer
>> itself
>> > > can
>> > > > > lag
>> > > > > > > and
>> > > > > > > >> > > cause
>> > > > > > > >> > > > > > > > temporary
>> > > > > > > >> > > > > > > > > > > audit
>> > > > > > > >> > > > > > > > > > > >    discrepancies
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > One way we can mitigate this is by
>> > having
>> > > > > > > >> mirror-makers
>> > > > > > > >> > > in
>> > > > > > > >> > > > > > > between
>> > > > > > > >> > > > > > > > > > > clusters
>> > > > > > > >> > > > > > > > > > > > emit audit events. The problem is
>> that
>> > the
>> > > > > very
>> > > > > > > last
>> > > > > > > >> > > > cluster
>> > > > > > > >> > > > > in
>> > > > > > > >> > > > > > > the
>> > > > > > > >> > > > > > > > > > > > pipeline will not have any audit
>> which
>> > is
>> > > > why
>> > > > > we
>> > > > > > > >> need
>> > > > > > > >> > to
>> > > > > > > >> > > > have
>> > > > > > > >> > > > > > > > > something
>> > > > > > > >> > > > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > > audit the cluster.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > If we had a custom message validator
>> > then
>> > > > the
>> > > > > > > audit
>> > > > > > > >> can
>> > > > > > > >> > > be
>> > > > > > > >> > > > > done
>> > > > > > > >> > > > > > > > > > > on-arrival
>> > > > > > > >> > > > > > > > > > > > and we won't need a console auditor.
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > One potential issue in this approach
>> and
>> > > any
>> > > > > > > >> elaborate
>> > > > > > > >> > > > > > on-arrival
>> > > > > > > >> > > > > > > > > > > > processing for that matter is that
>> you
>> > may
>> > > > > need
>> > > > > > to
>> > > > > > > >> > > > > deserialize
>> > > > > > > >> > > > > > > the
>> > > > > > > >> > > > > > > > > > > message
>> > > > > > > >> > > > > > > > > > > > as well which can drive up produce
>> > request
>> > > > > > > handling
>> > > > > > > >> > > times.
>> > > > > > > >> > > > > > > However
>> > > > > > > >> > > > > > > > > I'm
>> > > > > > > >> > > > > > > > > > > not
>> > > > > > > >> > > > > > > > > > > > terribly concerned about that
>> especially
>> > > if
>> > > > > the
>> > > > > > > >> audit
>> > > > > > > >> > > > header
>> > > > > > > >> > > > > > can
>> > > > > > > >> > > > > > > be
>> > > > > > > >> > > > > > > > > > > > separated out easily or even
>> > deserialized
>> > > > > > > partially
>> > > > > > > >> as
>> > > > > > > >> > > this
>> > > > > > > >> > > > > > Avro
>> > > > > > > >> > > > > > > > > thread
>> > > > > > > >> > > > > > > > > > > > touches on
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > Joel
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM,
>> > Mayuresh
>> > > > > > Gharat
>> > > > > > > <
>> > > > > > > >> > > > > > > > > > > > gharatmayures...@gmail.com> wrote:
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea.
>> > > > > > > >> > > > > > > > > > > > > Was just thinking if we can add
>> > > onDequed()
>> > > > > to
>> > > > > > > the
>> > > > > > > >> > > > > > > > > ProducerIterceptor
>> > > > > > > >> > > > > > > > > > > > > interface. Since we have the
>> > > onEnqueued(),
>> > > > > it
>> > > > > > > will
>> > > > > > > >> > help
>> > > > > > > >> > > > the
>> > > > > > > >> > > > > > > > client
>> > > > > > > >> > > > > > > > > or
>> > > > > > > >> > > > > > > > > > > the
>> > > > > > > >> > > > > > > > > > > > > tools to know how much time the
>> > message
>> > > > > spent
>> > > > > > in
>> > > > > > > >> the
>> > > > > > > >> > > > > > > > > > RecordAccumulator.
>> > > > > > > >> > > > > > > > > > > > > Also an API to check if there are
>> any
>> > > > > messages
>> > > > > > > >> left
>> > > > > > > >> > > for a
>> > > > > > > >> > > > > > > > > particular
>> > > > > > > >> > > > > > > > > > > > topic
>> > > > > > > >> > > > > > > > > > > > > in the RecordAccumulator would
>> help.
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > Mayuresh
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM,
>> Todd
>> > > > > Palino
>> > > > > > <
>> > > > > > > >> > > > > > > tpal...@gmail.com
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve been talking
>> about
>> > > this
>> > > > > > for 2
>> > > > > > > >> > years,
>> > > > > > > >> > > > and
>> > > > > > > >> > > > > > I’m
>> > > > > > > >> > > > > > > > > glad
>> > > > > > > >> > > > > > > > > > > > > someone
>> > > > > > > >> > > > > > > > > > > > > > is finally picking it up. Will
>> take
>> > a
>> > > > look
>> > > > > > at
>> > > > > > > >> the
>> > > > > > > >> > KIP
>> > > > > > > >> > > > at
>> > > > > > > >> > > > > > some
>> > > > > > > >> > > > > > > > > point
>> > > > > > > >> > > > > > > > > > > > > > shortly.
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > -Todd
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM,
>> > Jay
>> > > > > Kreps
>> > > > > > <
>> > > > > > > >> > > > > > > j...@confluent.io>
>> > > > > > > >> > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > Hey Becket,
>> > > > > > > >> > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > Yeah this is really similar to
>> the
>> > > > > > callback.
>> > > > > > > >> The
>> > > > > > > >> > > > > > difference
>> > > > > > > >> > > > > > > > is
>> > > > > > > >> > > > > > > > > > > really
>> > > > > > > >> > > > > > > > > > > > > in
>> > > > > > > >> > > > > > > > > > > > > > > who sets the behavior. The
>> idea of
>> > > the
>> > > > > > > >> > interceptor
>> > > > > > > >> > > is
>> > > > > > > >> > > > > > that
>> > > > > > > >> > > > > > > it
>> > > > > > > >> > > > > > > > > > > doesn't
>> > > > > > > >> > > > > > > > > > > > > > > require any code change in
>> apps so
>> > > you
>> > > > > can
>> > > > > > > >> > globally
>> > > > > > > >> > > > add
>> > > > > > > >> > > > > > > > > behavior
>> > > > > > > >> > > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > > > your
>> > > > > > > >> > > > > > > > > > > > > > > Kafka usage without changing
>> app
>> > > code.
>> > > > > > > Whereas
>> > > > > > > >> > the
>> > > > > > > >> > > > > > callback
>> > > > > > > >> > > > > > > > is
>> > > > > > > >> > > > > > > > > > > added
>> > > > > > > >> > > > > > > > > > > > by
>> > > > > > > >> > > > > > > > > > > > > > the
>> > > > > > > >> > > > > > > > > > > > > > > app. The idea is to kind of
>> > obviate
>> > > > the
>> > > > > > need
>> > > > > > > >> for
>> > > > > > > >> > > the
>> > > > > > > >> > > > > > > wrapper
>> > > > > > > >> > > > > > > > > code
>> > > > > > > >> > > > > > > > > > > > that
>> > > > > > > >> > > > > > > > > > > > > > e.g.
>> > > > > > > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this
>> > kind
>> > > > of
>> > > > > > > stuff.
>> > > > > > > >> > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > -Jay
>> > > > > > > >> > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21
>> PM,
>> > > > Becket
>> > > > > > Qin
>> > > > > > > <
>> > > > > > > >> > > > > > > > > > becket....@gmail.com>
>> > > > > > > >> > > > > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > This could be a useful
>> feature.
>> > > And
>> > > > I
>> > > > > > > think
>> > > > > > > >> > there
>> > > > > > > >> > > > are
>> > > > > > > >> > > > > > > some
>> > > > > > > >> > > > > > > > > use
>> > > > > > > >> > > > > > > > > > > > cases
>> > > > > > > >> > > > > > > > > > > > > to
>> > > > > > > >> > > > > > > > > > > > > > > > mutate the data like rejected
>> > > > > > alternative
>> > > > > > > >> one
>> > > > > > > >> > > > > > mentioned.
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > I am wondering if there is
>> > > > functional
>> > > > > > > >> > overlapping
>> > > > > > > >> > > > > > between
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > ProducerInterceptor.onAcknowledgement()
>> > > > > > > and
>> > > > > > > >> the
>> > > > > > > >> > > > > > producer
>> > > > > > > >> > > > > > > > > > > callback?
>> > > > > > > >> > > > > > > > > > > > I
>> > > > > > > >> > > > > > > > > > > > > > can
>> > > > > > > >> > > > > > > > > > > > > > > > see that the Callback could
>> be a
>> > > per
>> > > > > > > record
>> > > > > > > >> > > setting
>> > > > > > > >> > > > > > while
>> > > > > > > >> > > > > > > > > > > > > > > > onAcknowledgement() is a
>> > producer
>> > > > > level
>> > > > > > > >> > setting.
>> > > > > > > >> > > > > Other
>> > > > > > > >> > > > > > > than
>> > > > > > > >> > > > > > > > > > that,
>> > > > > > > >> > > > > > > > > > > > is
>> > > > > > > >> > > > > > > > > > > > > > > there
>> > > > > > > >> > > > > > > > > > > > > > > > any difference between them?
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21
>> PM,
>> > > > Neha
>> > > > > > > >> Narkhede
>> > > > > > > >> > <
>> > > > > > > >> > > > > > > > > > > n...@confluent.io>
>> > > > > > > >> > > > > > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > James,
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > That is one of the many
>> > > monitoring
>> > > > > use
>> > > > > > > >> cases
>> > > > > > > >> > > for
>> > > > > > > >> > > > > the
>> > > > > > > >> > > > > > > > > > > interceptor
>> > > > > > > >> > > > > > > > > > > > > > > > interface.
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > > > > > > > Neha
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at
>> 6:18
>> > PM,
>> > > > > James
>> > > > > > > >> Cheng
>> > > > > > > >> > <
>> > > > > > > >> > > > > > > > > > jch...@tivo.com>
>> > > > > > > >> > > > > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > Anna,
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > I'm trying to understand
>> a
>> > > > > concrete
>> > > > > > > use
>> > > > > > > >> > case.
>> > > > > > > >> > > > It
>> > > > > > > >> > > > > > > sounds
>> > > > > > > >> > > > > > > > > > like
>> > > > > > > >> > > > > > > > > > > > > > producer
>> > > > > > > >> > > > > > > > > > > > > > > > > > interceptors could be
>> used
>> > to
>> > > > > > > implement
>> > > > > > > >> > part
>> > > > > > > >> > > of
>> > > > > > > >> > > > > > > > > LinkedIn's
>> > > > > > > >> > > > > > > > > > > > Kafak
>> > > > > > > >> > > > > > > > > > > > > > > Audit
>> > > > > > > >> > > > > > > > > > > > > > > > > > tool?
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > >
>> > https://engineering.linkedin.com/kafka/running-kafka-scale
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > Part of that is done by a
>> > > > wrapper
>> > > > > > > >> library
>> > > > > > > >> > > > around
>> > > > > > > >> > > > > > the
>> > > > > > > >> > > > > > > > > kafka
>> > > > > > > >> > > > > > > > > > > > > producer
>> > > > > > > >> > > > > > > > > > > > > > > > that
>> > > > > > > >> > > > > > > > > > > > > > > > > > keeps a count of the
>> number
>> > of
>> > > > > > > messages
>> > > > > > > >> > > > produced,
>> > > > > > > >> > > > > > and
>> > > > > > > >> > > > > > > > > then
>> > > > > > > >> > > > > > > > > > > > sends
>> > > > > > > >> > > > > > > > > > > > > > that
>> > > > > > > >> > > > > > > > > > > > > > > > > count
>> > > > > > > >> > > > > > > > > > > > > > > > > > to a side-topic. It
>> sounds
>> > > like
>> > > > > the
>> > > > > > > >> > producer
>> > > > > > > >> > > > > > > > interceptors
>> > > > > > > >> > > > > > > > > > > could
>> > > > > > > >> > > > > > > > > > > > > > > > possibly
>> > > > > > > >> > > > > > > > > > > > > > > > > be
>> > > > > > > >> > > > > > > > > > > > > > > > > > used to implement that?
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > -James
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at
>> 4:33
>> > PM,
>> > > > > Anna
>> > > > > > > >> > Povzner <
>> > > > > > > >> > > > > > > > > > > a...@confluent.io
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > wrote:
>> > > > > > > >> > > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > > Hi,
>> > > > > > > >> > > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42
>> > for
>> > > > > adding
>> > > > > > > >> > producer
>> > > > > > > >> > > > and
>> > > > > > > >> > > > > > > > > consumer
>> > > > > > > >> > > > > > > > > > > > > > > interceptors
>> > > > > > > >> > > > > > > > > > > > > > > > > for
>> > > > > > > >> > > > > > > > > > > > > > > > > > > intercepting messages
>> at
>> > > > > different
>> > > > > > > >> points
>> > > > > > > >> > > on
>> > > > > > > >> > > > > > > producer
>> > > > > > > >> > > > > > > > > and
>> > > > > > > >> > > > > > > > > > > > > > consumer.
>> > > > > > > >> > > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
>> > > > > > > >> > > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > > Comments and
>> suggestions
>> > are
>> > > > > > > welcome!
>> > > > > > > >> > > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > > > > > > > > > Anna
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > ________________________________
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > > This email and any
>> > attachments
>> > > > may
>> > > > > > > >> contain
>> > > > > > > >> > > > > > > confidential
>> > > > > > > >> > > > > > > > > and
>> > > > > > > >> > > > > > > > > > > > > > > privileged
>> > > > > > > >> > > > > > > > > > > > > > > > > > material for the sole
>> use of
>> > > the
>> > > > > > > >> intended
>> > > > > > > >> > > > > > recipient.
>> > > > > > > >> > > > > > > > Any
>> > > > > > > >> > > > > > > > > > > > review,
>> > > > > > > >> > > > > > > > > > > > > > > > copying,
>> > > > > > > >> > > > > > > > > > > > > > > > > > or distribution of this
>> > email
>> > > > (or
>> > > > > > any
>> > > > > > > >> > > > > attachments)
>> > > > > > > >> > > > > > by
>> > > > > > > >> > > > > > > > > > others
>> > > > > > > >> > > > > > > > > > > is
>> > > > > > > >> > > > > > > > > > > > > > > > > prohibited.
>> > > > > > > >> > > > > > > > > > > > > > > > > > If you are not the
>> intended
>> > > > > > recipient,
>> > > > > > > >> > please
>> > > > > > > >> > > > > > contact
>> > > > > > > >> > > > > > > > the
>> > > > > > > >> > > > > > > > > > > > sender
>> > > > > > > >> > > > > > > > > > > > > > > > > > immediately and
>> permanently
>> > > > delete
>> > > > > > > this
>> > > > > > > >> > email
>> > > > > > > >> > > > and
>> > > > > > > >> > > > > > any
>> > > > > > > >> > > > > > > > > > > > > attachments.
>> > > > > > > >> > > > > > > > > > > > > > No
>> > > > > > > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo
>> > Inc.
>> > > > is
>> > > > > > > >> > authorized
>> > > > > > > >> > > to
>> > > > > > > >> > > > > > > > conclude
>> > > > > > > >> > > > > > > > > > any
>> > > > > > > >> > > > > > > > > > > > > > binding
>> > > > > > > >> > > > > > > > > > > > > > > > > > agreement on behalf of
>> TiVo
>> > > Inc.
>> > > > > by
>> > > > > > > >> email.
>> > > > > > > >> > > > > Binding
>> > > > > > > >> > > > > > > > > > agreements
>> > > > > > > >> > > > > > > > > > > > > with
>> > > > > > > >> > > > > > > > > > > > > > > TiVo
>> > > > > > > >> > > > > > > > > > > > > > > > > > Inc. may only be made by
>> a
>> > > > signed
>> > > > > > > >> written
>> > > > > > > >> > > > > > agreement.
>> > > > > > > >> > > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > > > --
>> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > > > > > > > > Neha
>> > > > > > > >> > > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > --
>> > > > > > > >> > > > > > > > > > > > > > *—-*
>> > > > > > > >> > > > > > > > > > > > > > *Todd Palino*
>> > > > > > > >> > > > > > > > > > > > > > Staff Site Reliability Engineer
>> > > > > > > >> > > > > > > > > > > > > > Data Infrastructure Streaming
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino
>> > > > > > > >> > > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > > > --
>> > > > > > > >> > > > > > > > > > > > > -Regards,
>> > > > > > > >> > > > > > > > > > > > > Mayuresh R. Gharat
>> > > > > > > >> > > > > > > > > > > > > (862) 250-7125
>> > > > > > > >> > > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > > >
>> > > > > > > >> > > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > > > --
>> > > > > > > >> > > > > > > > > > Thanks,
>> > > > > > > >> > > > > > > > > > Neha
>> > > > > > > >> > > > > > > > > >
>> > > > > > > >> > > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > --
>> > > > > > > >> > > > > > > > *—-*
>> > > > > > > >> > > > > > > > *Todd Palino*
>> > > > > > > >> > > > > > > > Staff Site Reliability Engineer
>> > > > > > > >> > > > > > > > Data Infrastructure Streaming
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > > > linkedin.com/in/toddpalino
>> > > > > > > >> > > > > > > >
>> > > > > > > >> > > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > > > --
>> > > > > > > >> > > > > > -Regards,
>> > > > > > > >> > > > > > Mayuresh R. Gharat
>> > > > > > > >> > > > > > (862) 250-7125
>> > > > > > > >> > > > > >
>> > > > > > > >> > > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > --
>> > > > > > > >> > > > *—-*
>> > > > > > > >> > > > *Todd Palino*
>> > > > > > > >> > > > Staff Site Reliability Engineer
>> > > > > > > >> > > > Data Infrastructure Streaming
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > >
>> > > > > > > >> > > > linkedin.com/in/toddpalino
>> > > > > > > >> > > >
>> > > > > > > >> > >
>> > > > > > > >> > >
>> > > > > > > >> > >
>> > > > > > > >> > > --
>> > > > > > > >> > > -Regards,
>> > > > > > > >> > > Mayuresh R. Gharat
>> > > > > > > >> > > (862) 250-7125
>> > > > > > > >> > >
>> > > > > > > >> >
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> --
>> > > > > > > >> -Regards,
>> > > > > > > >> Mayuresh R. Gharat
>> > > > > > > >> (862) 250-7125
>> > > > > > > >>
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > Thanks,
>> > > > > > Neha
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> *—-*
>> *Todd Palino*
>> Staff Site Reliability Engineer
>> Data Infrastructure Streaming
>>
>>
>>
>> linkedin.com/in/toddpalino
>>
>
>

Reply via email to