Nice finding on the CRC class. It will be great to switch to that at some
point.

On exposing the CRC - I think we are overthinking the problem. CRC over the
entire record makes sense for the durability check and having multiple CRCs
is a bad idea.

On Fri, Jan 29, 2016 at 4:47 PM, Jay Kreps <j...@confluent.io> wrote:

> The rationale for the CRC covering the whole record was to check corruption
> in the full record contents as corruption there will equally prevent
> someone trying to consume the data. I think you could argue either way but
> let's definitely not end up with two different CRC calculations, that would
> just be indecisive.
>
> That is a super interesting finding about the CRC class. I think you are
> right that it became an intrinsic in java 8 with hw support if available.
> We should really switch to that. The CRC calculation is still one of the
> biggest bottlenecks in the producer and consumer so that should
> significantly speed things up.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 4:30 PM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Anna,
> >
> > I think there is value if CRC for only user bytes can be used. This will
> > help when we have future protocol updates. Otherwise any protocol
> migration
> > might break auditing if it largely relies on CRC including system bytes.
> >
> > I did some test to understand the performance overhead of having a
> separate
> > user bytes CRC,
> >
> > On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for
> > each megabyte it takes 0.47 ms.
> > To compare with compression cost, I used the compressor used by producer.
> > The test uses GZIP and compress 1MB of data for each batch.
> > It takes:
> > ~90 seconds to compress 10GB bytes that contains all same byte.
> > ~470 seconds to compress 10GB bytes containing random bytes.
> > >1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns.
> (I
> > guess it takes time to update the pattern mapping)
> >
> > So the overhead of a separate CRC computing is <1% considering the
> producer
> > do a lot of things other than compression.
> >
> > I am not sure if this overhead is worth taking or not, because we do
> have a
> > huge number of messages to send, so any overhead should be avoided if
> > possible.
> >
> > BTW. Another interesting finding is that the Crc32 class used in clients
> > which used to be faster than java CRC32 in Java 1.6. It seems no longer
> the
> > case. What I see is that Java CRC32 class is 2x faster than the Crc32
> class
> > we are using now.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Jan 29, 2016 at 1:46 PM, Anna Povzner <a...@confluent.io> wrote:
> >
> > > Joel, thanks for your feedback. I updated the wiki based on your
> comments
> > > about the wiki writeup.
> > >
> > >
> > > On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner <a...@confluent.io>
> > wrote:
> > >
> > > > Becket,
> > > >
> > > > In your scenario with one message from producer A and one message
> from
> > > > producer B, those are two different messages, and they should be
> > tracked
> > > as
> > > > two different messages. So I would argue for using record CRC -- CRC
> > that
> > > > is actually used by the system + it will not require computing a
> > > different
> > > > CRC again which will add performance overhead.
> > > >
> > > > If the broker ever changes the CRC, the scenarios when that happens
> > > should
> > > > be very well defined. As far as I know, the scenarios when CRC is
> > > > overwritten by the broker (including KIP-31/32 changes):
> > > > -- topic config is LogAppendTime for timestamp type
> > > > -- upgrade/downgrade
> > > > -- compression codec change (which could be inferred from config).
> > > >
> > > > Monitoring/audit just needs to know when CRCs are safe to use, which
> is
> > > > most often is known from config. In the future, this can be further
> > > > addressed by broker interceptors.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin <becket....@gmail.com>
> > > wrote:
> > > >
> > > >> Neha,
> > > >>
> > > >> CRC is definitely an important type of metadata of a record. I am
> not
> > > >> arguing about that. But I think we should distinguish between two
> > types
> > > of
> > > >> checksum here, 1) the checksum of user data. and 2) the checksum
> > > including
> > > >> system appended bytes.
> > > >>
> > > >> I completely agree that (1) is good to add. But I am not sure if we
> > > should
> > > >> expose (2) to user, because this means any underlying protocol
> change
> > > will
> > > >> give a different CRC for exact same message. For example, let's say
> > > >> producer A is sending message with timestamp. Producer B is sending
> > > >> message
> > > >> without timestamp. Even they are given the exact same message, the
> CRC
> > > >> returned would be different.
> > > >>
> > > >> Also, Kafka broker will modify the system appended bytes in
> different
> > > >> scenarios, such as compression codec change, message format
> > > >> conversion(After KIP-31 and KIP-32).
> > > >>
> > > >> So my concern is that we are exposing CRC which including system
> > > appended
> > > >> bytes to user.
> > > >>
> > > >> Other than this I think everything looks good. Nice work, Anna.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy <jjkosh...@gmail.com>
> > > wrote:
> > > >>
> > > >> > Responding to some of the earlier comments in the thread:
> > > >> >
> > > >> > @Jay/@Neha,
> > > >> >
> > > >> > I think any one of onCommit/onAppend/onArrival would work for the
> > > >> concrete
> > > >> > use-case that I had outlined. I think onArrival is additionally
> > useful
> > > >> for
> > > >> > custom validation - i.e., reject the message and do not append if
> it
> > > >> > violates some cluster-specific rule (for e.g., if some header
> > > timestamp
> > > >> is
> > > >> > older than xyz). However, the thing with user-supplied validation
> is
> > > we
> > > >> > would have to do with a (new) generic error code in the producer
> > > >> response.
> > > >> > While there is a risk of a broker interceptor having high latency
> I
> > > >> think
> > > >> > that is acceptable since it is the user's responsibility to ensure
> > low
> > > >> > latency - the producer call-back and onAcknowledgment interceptor
> > are
> > > >> > similar in this regard although those are less risky. Even so, I
> > think
> > > >> > there are clear use-cases for broker interceptors so I feel the
> risk
> > > >> part
> > > >> > is something that just needs to be documented. @Jay that is a good
> > > point
> > > >> > about moving from Message/MessageSet to Records although that may
> be
> > > >> less
> > > >> > difficult to absorb since it is a broker-side interceptor and so
> > > people
> > > >> > don't need to get a ton of applications in their company to switch
> > to
> > > >> use
> > > >> > it.
> > > >> >
> > > >> > Re: onEnqueued: monitoring serialization latency can be done via
> > > metrics
> > > >> > but this is more useful for recording whether serialization
> > succeeded
> > > or
> > > >> > not. onAcknowledgment subsumes this but it also subsumes other
> > > possible
> > > >> > errors (such as produce errors). It is more fine-grained than most
> > > >> people
> > > >> > need though (i.e., I don't think we will use it even if it is
> > > present.)
> > > >> >
> > > >> > Re: checksums: I think it is a good addition to metadata; and for
> > > >> > low-volume or super-critical topics can be used for very strict
> > > >> auditing.
> > > >> >
> > > >> > There are a couple of typos/edits for the wiki itself:
> > > >> >
> > > >> >    - Under Kafka Producer changes:
> > > >> >    - you have references to KafkaConsumer constructor and
> > > >> >       ConsumerConfig.originals.
> > > >> >       - sendRecord -> sentRecord (may be clearer)
> > > >> >    - Under ProducerInterceptor interface: there is a mention of
> > > >> onEnqueued
> > > >> >    which was rejected
> > > >> >    - Comment for ConsumerRecord.record should probably be: // NEW:
> > > >> record
> > > >> >    size in bytes (*after decompression*)
> > > >> >
> > > >> >
> > > >> > BTW - Anna, nice work on the KIP!
> > > >> >
> > > >> > Joel
> > > >> >
> > > >> > On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede <n...@confluent.io
> >
> > > >> wrote:
> > > >> >
> > > >> > > Becket,
> > > >> > >
> > > >> > > Is your concern the presence of CRC in the RecordMetadata or do
> > you
> > > >> want
> > > >> > to
> > > >> > > brainstorm how CRC can be used for auditing? I think we
> shouldn't
> > > try
> > > >> to
> > > >> > > think about the various ways that people can do monitoring using
> > > >> > > interceptors and the metadata we provide. The entire point of
> > having
> > > >> > > pluggable interceptors is so that people can employ their own
> > > creative
> > > >> > > mechanisms to make use of interceptors.
> > > >> > >
> > > >> > > I do think that it is worth discussing whether or not CRC makes
> > > sense
> > > >> as
> > > >> > > record metadata to the user. My take is that the CRC is the best
> > > >> > size-bound
> > > >> > > summary of serialized record content available to us which is
> > > >> expensive
> > > >> > to
> > > >> > > recompute if the user were to redo it. I'd argue this summary
> of a
> > > >> record
> > > >> > > qualifies as its metadata. After all, we use the record CRC for
> a
> > > very
> > > >> > > important test of the system durability as it travels through
> the
> > > >> system.
> > > >> > >
> > > >> > > 1. Isn't the TopicPartition + Offset already uniquely
> identified a
> > > >> > message?
> > > >> > > > It seems better than CRC no matter from summary point of view
> or
> > > >> > auditing
> > > >> > > > point of view.
> > > >> > >
> > > >> > >
> > > >> > > The offset is a system-assigned value of uniqueness to the
> > message.
> > > If
> > > >> > you
> > > >> > > trusted the system that much, you are not looking to monitor it
> > > >> > out-of-band
> > > >> > > :-)
> > > >> > >
> > > >> > >
> > > >> > > > 2. Currently CRC only has 4 bytes. So it will have collision
> > when
> > > >> there
> > > >> > > are
> > > >> > > > more than ~4 billion messages. Take LinkedIn as an example, we
> > > have
> > > >> 1.3
> > > >> > > > trillion messages per day. So there will be at least a couple
> of
> > > >> > hundreds
> > > >> > > > collision for each CRC code every day, whereas
> > > TopicPartition+Offset
> > > >> > will
> > > >> > > > not have any collision.
> > > >> > >
> > > >> > >
> > > >> > > The CRC isn't sent over the wire and doesn't add any extra
> > overhead
> > > in
> > > >> > > processing, so what is your concern? If you aren't convinced
> about
> > > its
> > > >> > > usefulness, you can always use the default do-nothing
> interceptor
> > at
> > > >> > > LinkedIn and ignore the CRC.
> > > >> > >
> > > >> > > Without having
> > > >> > > > the entire message bytes, they may not be able to verify its
> > > >> > correctness,
> > > >> > > > and the CRC could even be invalid if the broker ever
> overwritten
> > > any
> > > >> > > field
> > > >> > > > or did format conversion.
> > > >> > > >
> > > >> > >
> > > >> > > This doesn't make sense to me. The CRC is used for the most
> > > important
> > > >> > > durability check by Kafka - to verify that the message was not
> > > garbled
> > > >> > over
> > > >> > > the wire. The system can't change it; it has to match on the
> > > consumer
> > > >> > side
> > > >> > > or we will not return it to the user.
> > > >> > >
> > > >> > > On Fri, Jan 29, 2016 at 3:23 AM, Becket Qin <
> becket....@gmail.com
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > > Anna,
> > > >> > > >
> > > >> > > > It is still not clear to me why we should expose CRC to end
> > user.
> > > >> > > > Followings are my confusions.
> > > >> > > >
> > > >> > > > 1. Isn't the TopicPartition + Offset already uniquely
> > identified a
> > > >> > > message?
> > > >> > > > It seems better than CRC no matter from summary point of view
> or
> > > >> > auditing
> > > >> > > > point of view.
> > > >> > > >
> > > >> > > > 2. Currently CRC only has 4 bytes. So it will have collision
> > when
> > > >> there
> > > >> > > are
> > > >> > > > more than ~4 billion messages. Take LinkedIn as an example, we
> > > have
> > > >> 1.3
> > > >> > > > trillion messages per day. So there will be at least a couple
> of
> > > >> > hundreds
> > > >> > > > collision for each CRC code every day, whereas
> > > TopicPartition+Offset
> > > >> > will
> > > >> > > > not have any collision.
> > > >> > > >
> > > >> > > > 3. CRC is calculated after all the fields have been filled in
> by
> > > >> > > producer,
> > > >> > > > including timestamp, attributes, etc. It might also get
> > recomputed
> > > >> on
> > > >> > > > broker side. So if users only get CRC from record metadata.
> > > Without
> > > >> > > having
> > > >> > > > the entire message bytes, they may not be able to verify its
> > > >> > correctness,
> > > >> > > > and the CRC could even be invalid if the broker ever
> overwritten
> > > any
> > > >> > > field
> > > >> > > > or did format conversion.
> > > >> > > >
> > > >> > > > Thanks,
> > > >> > > >
> > > >> > > > Jiangjie (Becket) Qin
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > On Thu, Jan 28, 2016 at 5:58 PM, Anna Povzner <
> > a...@confluent.io>
> > > >> > wrote:
> > > >> > > >
> > > >> > > > > On a second thought, yes, I think we should expose record
> size
> > > >> that
> > > >> > > > > represents application bytes. This is Becket's option #1.
> > > >> > > > >
> > > >> > > > > I updated the KIP wiki with new fields in RecordMetadata and
> > > >> > > > > ConsumerRecord.
> > > >> > > > >
> > > >> > > > > I would like to start a voting thread tomorrow if there are
> no
> > > >> > > objections
> > > >> > > > > or more things to discuss.
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Anna
> > > >> > > > >
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner <
> > > a...@confluent.io>
> > > >> > > wrote:
> > > >> > > > >
> > > >> > > > > > Regarding record size as bytes sent over the wire, my
> > concern
> > > is
> > > >> > that
> > > >> > > > it
> > > >> > > > > > is almost impossible to calculate per-record. We could do
> > as:
> > > 1)
> > > >> > > > > compressed
> > > >> > > > > > bytes / number of records in a compressed message, as Todd
> > > >> > mentioned;
> > > >> > > > or
> > > >> > > > > 2)
> > > >> > > > > > or same as #1 but take it proportional to uncompressed
> > record
> > > >> size
> > > >> > > vs.
> > > >> > > > > > total uncompressed size of records. All of these
> > calculations
> > > >> give
> > > >> > us
> > > >> > > > an
> > > >> > > > > > estimate. So maybe record size as bytes sent over the wire
> > is
> > > >> not a
> > > >> > > > > > per-record metadata, but rather per topic/partition
> measure
> > > >> that is
> > > >> > > > > better
> > > >> > > > > > to be exposed through metrics?
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino <
> > > tpal...@gmail.com
> > > >> >
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > >> It may be difficult (or nearly impossible) to get actual
> > > >> > compressed
> > > >> > > > > bytes
> > > >> > > > > >> for a message from a compressed batch, but I do think
> it’s
> > > >> useful
> > > >> > > > > >> information to have available for the very reason noted,
> > > >> bandwidth
> > > >> > > > > >> consumed. Does it make sense to have an interceptor at
> the
> > > >> batch
> > > >> > > level
> > > >> > > > > >> that
> > > >> > > > > >> can provide this? The other option is to estimate it
> (such
> > as
> > > >> > making
> > > >> > > > an
> > > >> > > > > >> assumption that the messages in a batch are equal in
> size,
> > > >> which
> > > >> > is
> > > >> > > > not
> > > >> > > > > >> necessarily true), which is probably not the right
> answer.
> > > >> > > > > >>
> > > >> > > > > >> -Todd
> > > >> > > > > >>
> > > >> > > > > >>
> > > >> > > > > >> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner <
> > > >> a...@confluent.io>
> > > >> > > > > wrote:
> > > >> > > > > >>
> > > >> > > > > >> > Hi Becket,
> > > >> > > > > >> >
> > > >> > > > > >> > It will be up to the interceptor to implement their
> audit
> > > or
> > > >> > > > > monitoring
> > > >> > > > > >> > strategy. I would also imagine there is more than one
> > good
> > > >> way
> > > >> > to
> > > >> > > do
> > > >> > > > > >> audit.
> > > >> > > > > >> > So, I agree that some of the interceptors may not use
> > CRC,
> > > >> and
> > > >> > we
> > > >> > > > will
> > > >> > > > > >> not
> > > >> > > > > >> > require it. The question is now whether intercepting
> CRCs
> > > is
> > > >> > > > needed. I
> > > >> > > > > >> > think they are very useful for monitoring and audit,
> > > because
> > > >> CRC
> > > >> > > > > >> provides
> > > >> > > > > >> > an a easy way to get a summary of a message, rather
> than
> > > >> using
> > > >> > > > message
> > > >> > > > > >> > bytes or key/value objects.
> > > >> > > > > >> >
> > > >> > > > > >> > Regarding record size, I agree that bandwidth example
> was
> > > >> not a
> > > >> > > good
> > > >> > > > > >> one. I
> > > >> > > > > >> > think it would be hard to get actual bytes sent over
> the
> > > wire
> > > >> > > (your
> > > >> > > > > #2),
> > > >> > > > > >> > since multiple records get compressed together and we
> > would
> > > >> need
> > > >> > > to
> > > >> > > > > >> decide
> > > >> > > > > >> > which bytes to account to which record. So I am
> inclined
> > to
> > > >> only
> > > >> > > do
> > > >> > > > > your
> > > >> > > > > >> > #1. However, it still makes more sense to me just to
> > return
> > > >> > record
> > > >> > > > > size
> > > >> > > > > >> > including the header, since this is the actual record
> > size.
> > > >> > > > > >> >
> > > >> > > > > >> > Thanks,
> > > >> > > > > >> > Anna
> > > >> > > > > >> >
> > > >> > > > > >> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin <
> > > >> > > becket....@gmail.com>
> > > >> > > > > >> wrote:
> > > >> > > > > >> >
> > > >> > > > > >> > > Anna,
> > > >> > > > > >> > >
> > > >> > > > > >> > > Using CRC to do end2end auditing might be very costly
> > > >> because
> > > >> > > you
> > > >> > > > > will
> > > >> > > > > >> > need
> > > >> > > > > >> > > to collect all the CRC from both producer and
> consumer.
> > > >> And it
> > > >> > > is
> > > >> > > > > >> based
> > > >> > > > > >> > on
> > > >> > > > > >> > > the assumption that broker does not modify the
> record.
> > > >> > > > > >> > > Can you shed some idea on how end to end auditing
> will
> > be
> > > >> > using
> > > >> > > > the
> > > >> > > > > >> CRC
> > > >> > > > > >> > > before we decide to expose such low level detail to
> the
> > > end
> > > >> > > user?
> > > >> > > > It
> > > >> > > > > >> > would
> > > >> > > > > >> > > also be helpful if you can compare it with something
> > like
> > > >> > > sequence
> > > >> > > > > >> number
> > > >> > > > > >> > > based auditing.
> > > >> > > > > >> > >
> > > >> > > > > >> > > About the record size, one thing worth notice is that
> > the
> > > >> size
> > > >> > > of
> > > >> > > > > >> Record
> > > >> > > > > >> > is
> > > >> > > > > >> > > not the actual bytes sent over the wire if we use
> > > >> compression.
> > > >> > > So
> > > >> > > > > that
> > > >> > > > > >> > does
> > > >> > > > > >> > > not really tell user how much bandwidth they are
> using.
> > > >> > > > Personally I
> > > >> > > > > >> > think
> > > >> > > > > >> > > two kinds of size may be useful.
> > > >> > > > > >> > > 1. The record size after serialization, i.e.
> > application
> > > >> > bytes.
> > > >> > > > (The
> > > >> > > > > >> > > uncompressed record size can be easily derived as
> well)
> > > >> > > > > >> > > 2. The actual bytes sent over the wire.
> > > >> > > > > >> > > We can get (1) easily, but (2) is difficult to get at
> > > >> Record
> > > >> > > level
> > > >> > > > > >> when
> > > >> > > > > >> > we
> > > >> > > > > >> > > use compression.
> > > >> > > > > >> > >
> > > >> > > > > >> > > Thanks,
> > > >> > > > > >> > >
> > > >> > > > > >> > > Jiangjie (Becket) Qin
> > > >> > > > > >> > >
> > > >> > > > > >> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <
> > > >> > > a...@confluent.io
> > > >> > > > >
> > > >> > > > > >> > wrote:
> > > >> > > > > >> > >
> > > >> > > > > >> > > > Hi Becket,
> > > >> > > > > >> > > >
> > > >> > > > > >> > > > The use-case for CRC is end-to-end audit, rather
> than
> > > >> > checking
> > > >> > > > > >> whether
> > > >> > > > > >> > a
> > > >> > > > > >> > > > single message is corrupt or not.
> > > >> > > > > >> > > >
> > > >> > > > > >> > > > Regarding record size, I was thinking to extract
> > record
> > > >> size
> > > >> > > > from
> > > >> > > > > >> > Record.
> > > >> > > > > >> > > > That will include header overhead as well. I think
> > > total
> > > >> > > record
> > > >> > > > > size
> > > >> > > > > >> > will
> > > >> > > > > >> > > > tell users how much bandwidth their messages take.
> > > Since
> > > >> > > header
> > > >> > > > is
> > > >> > > > > >> > > > relatively small and constant, users also will get
> an
> > > >> idea
> > > >> > of
> > > >> > > > > their
> > > >> > > > > >> > > > key/value sizes.
> > > >> > > > > >> > > >
> > > >> > > > > >> > > > Thanks,
> > > >> > > > > >> > > > Anna
> > > >> > > > > >> > > >
> > > >> > > > > >> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <
> > > >> > > > becket....@gmail.com
> > > >> > > > > >
> > > >> > > > > >> > > wrote:
> > > >> > > > > >> > > >
> > > >> > > > > >> > > > > I am +1 on #1.2 and #3.
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > > #2: Regarding CRC, I am not sure if users care
> > about
> > > >> CRC.
> > > >> > is
> > > >> > > > > there
> > > >> > > > > >> > any
> > > >> > > > > >> > > > > specific use case? Currently we validate messages
> > by
> > > >> > calling
> > > >> > > > > >> > > > ensureValid()
> > > >> > > > > >> > > > > to verify the checksum and throw exception if it
> > does
> > > >> not
> > > >> > > > match.
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > > Message size would be useful. We can add that to
> > > >> > > > ConsumerRecord.
> > > >> > > > > >> Can
> > > >> > > > > >> > > you
> > > >> > > > > >> > > > > clarify the message size you are referring to?
> Does
> > > it
> > > >> > > include
> > > >> > > > > the
> > > >> > > > > >> > > > message
> > > >> > > > > >> > > > > header overhead or not? From user's point of
> view,
> > > they
> > > >> > > > probably
> > > >> > > > > >> > don't
> > > >> > > > > >> > > > care
> > > >> > > > > >> > > > > about header size.
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > > Thanks,
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > > Jiangjie (Becket) Qin
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <
> > > >> > > > > n...@confluent.io
> > > >> > > > > >> >
> > > >> > > > > >> > > > wrote:
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > > > > Anna,
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > Thanks for being diligent.
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > +1 on #1.2 and sounds good on #3. I recommend
> > > adding
> > > >> > > > checksum
> > > >> > > > > >> and
> > > >> > > > > >> > > size
> > > >> > > > > >> > > > > > fields to RecordMetadata and ConsumerRecord
> > instead
> > > >> of
> > > >> > > > > exposing
> > > >> > > > > >> > > > metadata
> > > >> > > > > >> > > > > > piecemeal in the interceptor APIs.
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > Thanks,
> > > >> > > > > >> > > > > > Neha
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <
> > > >> > > > > >> a...@confluent.io>
> > > >> > > > > >> > > > wrote:
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > > Hi All,
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > The KIP wiki page is now up-to-date with the
> > > scope
> > > >> we
> > > >> > > have
> > > >> > > > > >> agreed
> > > >> > > > > >> > > on:
> > > >> > > > > >> > > > > > > Producer and Consumer Interceptors with a
> > minimal
> > > >> set
> > > >> > of
> > > >> > > > > >> mutable
> > > >> > > > > >> > > API
> > > >> > > > > >> > > > > that
> > > >> > > > > >> > > > > > > are not dependent on producer and consumer
> > > internal
> > > >> > > > > >> > implementation.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > I have few more API details that I would like
> > to
> > > >> bring
> > > >> > > > > >> attention
> > > >> > > > > >> > to
> > > >> > > > > >> > > > > > or/and
> > > >> > > > > >> > > > > > > discuss:
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > 1. Handling exceptions
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > Exceptions can provide an additional level of
> > > >> control.
> > > >> > > For
> > > >> > > > > >> > example,
> > > >> > > > > >> > > > we
> > > >> > > > > >> > > > > > can
> > > >> > > > > >> > > > > > > filter messages on consumer side or stop
> > messages
> > > >> on
> > > >> > > > > producer
> > > >> > > > > >> if
> > > >> > > > > >> > > they
> > > >> > > > > >> > > > > > don’t
> > > >> > > > > >> > > > > > > have the right field.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > I see two options:
> > > >> > > > > >> > > > > > > 1.1. For callbacks that can mutate records
> > > (onSend
> > > >> and
> > > >> > > > > >> > onConsume),
> > > >> > > > > >> > > > > > > propagate exceptions through the original
> calls
> > > >> > > > > >> > > (KafkaProducer.send()
> > > >> > > > > >> > > > > and
> > > >> > > > > >> > > > > > > KafkaConsumer.poll() respectively). For other
> > > >> > callbacks,
> > > >> > > > > catch
> > > >> > > > > >> > > > > exception,
> > > >> > > > > >> > > > > > > log, and ignore.
> > > >> > > > > >> > > > > > > 1.2. Catch exceptions from all the
> interceptor
> > > >> > callbacks
> > > >> > > > and
> > > >> > > > > >> > > ignore.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > The issue with 1.1. is that it effectively
> > > changes
> > > >> > > > > >> > > > KafkaProducer.send()
> > > >> > > > > >> > > > > > and
> > > >> > > > > >> > > > > > > KafkaConsumer.poll() API, since now they may
> > > throw
> > > >> > > > > exceptions
> > > >> > > > > >> > that
> > > >> > > > > >> > > > are
> > > >> > > > > >> > > > > > not
> > > >> > > > > >> > > > > > > documented in KafkaProducer/Consumer API.
> > Another
> > > >> > option
> > > >> > > > is
> > > >> > > > > to
> > > >> > > > > >> > > allow
> > > >> > > > > >> > > > to
> > > >> > > > > >> > > > > > > propagate some exceptions, and ignore others.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > I think our use-cases do not require
> > propagating
> > > >> > > > exceptions.
> > > >> > > > > >> So,
> > > >> > > > > >> > I
> > > >> > > > > >> > > > > > propose
> > > >> > > > > >> > > > > > > option 1.2. Unless someone has
> > > suggestion/use-cases
> > > >> > for
> > > >> > > > > >> > propagating
> > > >> > > > > >> > > > > > > exceptions. Please let me know.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > 2. Intercepting record CRC and record size
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > Since we decided not to add any intermediate
> > > >> callbacks
> > > >> > > > (such
> > > >> > > > > >> as
> > > >> > > > > >> > > > > onEnqueue
> > > >> > > > > >> > > > > > > or onReceive) to interceptors, I think it is
> > > still
> > > >> > > > valuable
> > > >> > > > > to
> > > >> > > > > >> > > > > intercept
> > > >> > > > > >> > > > > > > record CRC and record size in bytes for
> > > monitoring
> > > >> and
> > > >> > > > audit
> > > >> > > > > >> > > > use-cases.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > I propose to add checksum and size fields to
> > > >> > > > RecordMetadata
> > > >> > > > > >> and
> > > >> > > > > >> > > > > > > ConsumerRecord. Another option would be to
> add
> > > >> them as
> > > >> > > > > >> parameters
> > > >> > > > > >> > > in
> > > >> > > > > >> > > > > > > onAcknowledgement() and onConsume()
> callbacks.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > 3. Callbacks that allow to modify records
> look
> > as
> > > >> > > follows:
> > > >> > > > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K,
> > V>
> > > >> > > record);
> > > >> > > > > >> > > > > > > ConsumerRecords<K, V>
> > > onConsume(ConsumerRecords<K,
> > > >> V>
> > > >> > > > > >> records);
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > This means that interceptors can potentially
> > > modify
> > > >> > > > > >> > topic/partition
> > > >> > > > > >> > > > in
> > > >> > > > > >> > > > > > > ProducerRecord and topic/partition/offset in
> > > >> > > > > ConsumerRecord. I
> > > >> > > > > >> > > > propose
> > > >> > > > > >> > > > > > that
> > > >> > > > > >> > > > > > > it is up to the interceptor implementation to
> > > >> ensure
> > > >> > > that
> > > >> > > > > >> > > > > > topic/partition,
> > > >> > > > > >> > > > > > > etc is correct. KafkaProducer.send() will use
> > > >> topic,
> > > >> > > > > >> partition,
> > > >> > > > > >> > > key,
> > > >> > > > > >> > > > > and
> > > >> > > > > >> > > > > > > value from ProducerRecord returned from the
> > > >> onSend().
> > > >> > > > > >> Similarly,
> > > >> > > > > >> > > > > > > ConsumerRecords returned from
> > > KafkaConsumer.poll()
> > > >> > would
> > > >> > > > be
> > > >> > > > > >> the
> > > >> > > > > >> > > ones
> > > >> > > > > >> > > > > > > returned from the interceptor.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > Please let me know if you have any
> suggestions
> > or
> > > >> > > > objections
> > > >> > > > > >> to
> > > >> > > > > >> > the
> > > >> > > > > >> > > > > > above.
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > Thanks,
> > > >> > > > > >> > > > > > > Anna
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna
> Povzner <
> > > >> > > > > >> a...@confluent.io
> > > >> > > > > >> > >
> > > >> > > > > >> > > > > wrote:
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > > Hi Mayuresh,
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > > I see why you would want to check for
> > messages
> > > >> left
> > > >> > in
> > > >> > > > the
> > > >> > > > > >> > > > > > > > RecordAccumulator. However, I don't think
> > this
> > > >> will
> > > >> > > > > >> completely
> > > >> > > > > >> > > > solve
> > > >> > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > > problem. Messages could be in-flight
> > somewhere
> > > >> else,
> > > >> > > > like
> > > >> > > > > in
> > > >> > > > > >> > the
> > > >> > > > > >> > > > > > socket,
> > > >> > > > > >> > > > > > > or
> > > >> > > > > >> > > > > > > > there maybe in-flight messages on the
> > consumer
> > > >> side
> > > >> > of
> > > >> > > > the
> > > >> > > > > >> > > > > MirrorMaker.
> > > >> > > > > >> > > > > > > So,
> > > >> > > > > >> > > > > > > > if we go the route of checking whether
> there
> > > are
> > > >> any
> > > >> > > > > >> in-flight
> > > >> > > > > >> > > > > messages
> > > >> > > > > >> > > > > > > for
> > > >> > > > > >> > > > > > > > topic deletion use-case, maybe it is better
> > > count
> > > >> > them
> > > >> > > > > with
> > > >> > > > > >> > > > onSend()
> > > >> > > > > >> > > > > > and
> > > >> > > > > >> > > > > > > > onAcknowledge() -- whether all messages
> sent
> > > were
> > > >> > > > > >> > acknowledged. I
> > > >> > > > > >> > > > > also
> > > >> > > > > >> > > > > > > > think that it would be better to solve this
> > > >> without
> > > >> > > > > >> > interceptors,
> > > >> > > > > >> > > > > such
> > > >> > > > > >> > > > > > as
> > > >> > > > > >> > > > > > > > fix error handling in this scenario.
> > However, I
> > > >> do
> > > >> > not
> > > >> > > > > have
> > > >> > > > > >> any
> > > >> > > > > >> > > > good
> > > >> > > > > >> > > > > > > > proposal right now, so these are just
> general
> > > >> > > thoughts.
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > > Anna
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh
> > > >> Gharat <
> > > >> > > > > >> > > > > > > > gharatmayures...@gmail.com> wrote:
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> Calling producer.flush(), flushes all the
> > > data.
> > > >> So
> > > >> > > this
> > > >> > > > > is
> > > >> > > > > >> OK.
> > > >> > > > > >> > > But
> > > >> > > > > >> > > > > > when
> > > >> > > > > >> > > > > > > >> you
> > > >> > > > > >> > > > > > > >> are running Mirror maker, I am not sure
> > there
> > > >> is a
> > > >> > > way
> > > >> > > > to
> > > >> > > > > >> > > flush()
> > > >> > > > > >> > > > > from
> > > >> > > > > >> > > > > > > >> outside.
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >> Thanks,
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >> Mayuresh
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket
> > Qin <
> > > >> > > > > >> > > > becket....@gmail.com>
> > > >> > > > > >> > > > > > > >> wrote:
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >> > Mayuresh,
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >> > Regarding your use case about mirror
> > maker.
> > > >> Is it
> > > >> > > > good
> > > >> > > > > >> > enough
> > > >> > > > > >> > > as
> > > >> > > > > >> > > > > > long
> > > >> > > > > >> > > > > > > >> as we
> > > >> > > > > >> > > > > > > >> > know there is no message for the topic
> in
> > > the
> > > >> > > > producer
> > > >> > > > > >> > > anymore?
> > > >> > > > > >> > > > If
> > > >> > > > > >> > > > > > > that
> > > >> > > > > >> > > > > > > >> is
> > > >> > > > > >> > > > > > > >> > the case, call producer.flush() is
> > > sufficient.
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >> > Thanks,
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >> > Jiangjie (Becket) Qin
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM,
> Mayuresh
> > > >> Gharat
> > > >> > <
> > > >> > > > > >> > > > > > > >> > gharatmayures...@gmail.com
> > > >> > > > > >> > > > > > > >> > > wrote:
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >> > > Hi Anna,
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > Thanks a lot for summarizing the
> > > discussion
> > > >> on
> > > >> > > this
> > > >> > > > > >> kip.
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > It LGTM.
> > > >> > > > > >> > > > > > > >> > > This is really nice :
> > > >> > > > > >> > > > > > > >> > > We decided not to add any callbacks to
> > > >> producer
> > > >> > > and
> > > >> > > > > >> > consumer
> > > >> > > > > >> > > > > > > >> > > interceptors that will depend on
> > internal
> > > >> > > > > >> implementation
> > > >> > > > > >> > as
> > > >> > > > > >> > > > part
> > > >> > > > > >> > > > > > of
> > > >> > > > > >> > > > > > > >> this
> > > >> > > > > >> > > > > > > >> > > KIP.
> > > >> > > > > >> > > > > > > >> > > *However, it is possible to add them
> > later
> > > >> as
> > > >> > > part
> > > >> > > > of
> > > >> > > > > >> > > another
> > > >> > > > > >> > > > > KIP
> > > >> > > > > >> > > > > > if
> > > >> > > > > >> > > > > > > >> > there
> > > >> > > > > >> > > > > > > >> > > are good use-cases.*
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > Do you agree with the use case I
> > explained
> > > >> > > earlier
> > > >> > > > > for
> > > >> > > > > >> > > knowing
> > > >> > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> number
> > > >> > > > > >> > > > > > > >> > > of records left in the
> RecordAccumulator
> > > >> for a
> > > >> > > > > >> particular
> > > >> > > > > >> > > > topic.
> > > >> > > > > >> > > > > > It
> > > >> > > > > >> > > > > > > >> might
> > > >> > > > > >> > > > > > > >> > > be orthogonal to this KIP, but will be
> > > >> helpful.
> > > >> > > > What
> > > >> > > > > do
> > > >> > > > > >> > you
> > > >> > > > > >> > > > > think?
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > Thanks,
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > Mayuresh
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd
> > > >> Palino <
> > > >> > > > > >> > > > tpal...@gmail.com
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> wrote:
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > > This looks good. As noted, having
> one
> > > >> mutable
> > > >> > > > > >> > interceptor
> > > >> > > > > >> > > on
> > > >> > > > > >> > > > > > each
> > > >> > > > > >> > > > > > > >> side
> > > >> > > > > >> > > > > > > >> > > > allows for the use cases we can
> > envision
> > > >> > right
> > > >> > > > now,
> > > >> > > > > >> and
> > > >> > > > > >> > I
> > > >> > > > > >> > > > > think
> > > >> > > > > >> > > > > > > >> that’s
> > > >> > > > > >> > > > > > > >> > > > going to provide a great deal of
> > > >> opportunity
> > > >> > > for
> > > >> > > > > >> > > > implementing
> > > >> > > > > >> > > > > > > things
> > > >> > > > > >> > > > > > > >> > like
> > > >> > > > > >> > > > > > > >> > > > audit, especially within a
> > multi-tenant
> > > >> > > > > environment.
> > > >> > > > > >> > > Looking
> > > >> > > > > >> > > > > > > >> forward to
> > > >> > > > > >> > > > > > > >> > > > getting this available in the
> clients.
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > Thanks!
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > -Todd
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM,
> Anna
> > > >> > Povzner <
> > > >> > > > > >> > > > > > a...@confluent.io>
> > > >> > > > > >> > > > > > > >> > wrote:
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > > Hi All,
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > Here is meeting notes from today’s
> > KIP
> > > >> > > meeting:
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > 1. We agreed to keep the scope of
> > this
> > > >> KIP
> > > >> > to
> > > >> > > > be
> > > >> > > > > >> > > producer
> > > >> > > > > >> > > > > and
> > > >> > > > > >> > > > > > > >> > consumer
> > > >> > > > > >> > > > > > > >> > > > > interceptors only. Broker-side
> > > >> interceptor
> > > >> > > will
> > > >> > > > > be
> > > >> > > > > >> > added
> > > >> > > > > >> > > > > later
> > > >> > > > > >> > > > > > > as
> > > >> > > > > >> > > > > > > >> a
> > > >> > > > > >> > > > > > > >> > > > > separate KIP. The reasons were
> > already
> > > >> > > > mentioned
> > > >> > > > > in
> > > >> > > > > >> > this
> > > >> > > > > >> > > > > > thread,
> > > >> > > > > >> > > > > > > >> but
> > > >> > > > > >> > > > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > > > > summary is:
> > > >> > > > > >> > > > > > > >> > > > >  * Broker interceptor is riskier
> and
> > > >> > requires
> > > >> > > > > >> careful
> > > >> > > > > >> > > > > > > >> consideration
> > > >> > > > > >> > > > > > > >> > > about
> > > >> > > > > >> > > > > > > >> > > > > overheads, whether to intercept
> > > leaders
> > > >> vs.
> > > >> > > > > >> > > > > leaders/replicas,
> > > >> > > > > >> > > > > > > >> what to
> > > >> > > > > >> > > > > > > >> > > do
> > > >> > > > > >> > > > > > > >> > > > on
> > > >> > > > > >> > > > > > > >> > > > > leader failover and so on.
> > > >> > > > > >> > > > > > > >> > > > >  * Broker interceptors increase
> > > >> monitoring
> > > >> > > > > >> resolution,
> > > >> > > > > >> > > but
> > > >> > > > > >> > > > > not
> > > >> > > > > >> > > > > > > >> > > including
> > > >> > > > > >> > > > > > > >> > > > it
> > > >> > > > > >> > > > > > > >> > > > > in this KIP does not reduce
> > usefulness
> > > >> of
> > > >> > > > > producer
> > > >> > > > > >> and
> > > >> > > > > >> > > > > > consumer
> > > >> > > > > >> > > > > > > >> > > > > interceptors that enable
> end-to-end
> > > >> > > monitoring
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > 2. We agreed to scope
> > > >> ProducerInterceptor
> > > >> > and
> > > >> > > > > >> > > > > > > ConsumerInterceptor
> > > >> > > > > >> > > > > > > >> > > > callbacks
> > > >> > > > > >> > > > > > > >> > > > > to minimal set of mutable API that
> > are
> > > >> not
> > > >> > > > > >> dependent
> > > >> > > > > >> > on
> > > >> > > > > >> > > > > > producer
> > > >> > > > > >> > > > > > > >> and
> > > >> > > > > >> > > > > > > >> > > > > consumer internal implementation.
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > ProducerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > *ProducerRecord<K, V>
> > > >> > > onSend(ProducerRecord<K,
> > > >> > > > V>
> > > >> > > > > >> > > > record);*
> > > >> > > > > >> > > > > > > >> > > > > *void
> > onAcknowledgement(RecordMetadata
> > > >> > > > metadata,
> > > >> > > > > >> > > Exception
> > > >> > > > > >> > > > > > > >> > exception);*
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > ConsumerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > *ConsumerRecords<K, V>
> > > >> > > > > >> onConsume(ConsumerRecords<K, V>
> > > >> > > > > >> > > > > > > records);*
> > > >> > > > > >> > > > > > > >> > > > > *void onCommit(Map<TopicPartition,
> > > >> > > > > >> OffsetAndMetadata>
> > > >> > > > > >> > > > > > offsets);*
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > We will allow interceptors to
> modify
> > > >> > > > > >> ProducerRecord on
> > > >> > > > > >> > > > > > producer
> > > >> > > > > >> > > > > > > >> side,
> > > >> > > > > >> > > > > > > >> > > and
> > > >> > > > > >> > > > > > > >> > > > > modify ConsumerRecords on consumer
> > > side.
> > > >> > This
> > > >> > > > > will
> > > >> > > > > >> > > support
> > > >> > > > > >> > > > > > > >> end-to-end
> > > >> > > > > >> > > > > > > >> > > > > monitoring and auditing and
> support
> > > the
> > > >> > > ability
> > > >> > > > > to
> > > >> > > > > >> add
> > > >> > > > > >> > > > > > metadata
> > > >> > > > > >> > > > > > > >> for a
> > > >> > > > > >> > > > > > > >> > > > > message. This will support Todd’s
> > > >> Auditing
> > > >> > > and
> > > >> > > > > >> Routing
> > > >> > > > > >> > > > > > > use-cases.
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > We did not find any use-case for
> > > >> modifying
> > > >> > > > > records
> > > >> > > > > >> in
> > > >> > > > > >> > > > > > > onConsume()
> > > >> > > > > >> > > > > > > >> > > > callback,
> > > >> > > > > >> > > > > > > >> > > > > but decided to enable modification
> > of
> > > >> > > consumer
> > > >> > > > > >> records
> > > >> > > > > >> > > for
> > > >> > > > > >> > > > > > > >> symmetry
> > > >> > > > > >> > > > > > > >> > > with
> > > >> > > > > >> > > > > > > >> > > > > onSend().
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > 3. We agreed to ensure
> compatibility
> > > >> > when/if
> > > >> > > we
> > > >> > > > > add
> > > >> > > > > >> > new
> > > >> > > > > >> > > > > > methods
> > > >> > > > > >> > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > ProducerInterceptor and
> > > >> ConsumerInterceptor
> > > >> > > by
> > > >> > > > > >> using
> > > >> > > > > >> > > > default
> > > >> > > > > >> > > > > > > >> methods
> > > >> > > > > >> > > > > > > >> > > with
> > > >> > > > > >> > > > > > > >> > > > > an empty implementation. Ok to
> > assume
> > > >> Java
> > > >> > 8.
> > > >> > > > > >> (This is
> > > >> > > > > >> > > > > > Ismael’s
> > > >> > > > > >> > > > > > > >> > method
> > > >> > > > > >> > > > > > > >> > > > #2).
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > 4. We decided not to add any
> > callbacks
> > > >> to
> > > >> > > > > producer
> > > >> > > > > >> and
> > > >> > > > > >> > > > > > consumer
> > > >> > > > > >> > > > > > > >> > > > > interceptors that will depend on
> > > >> internal
> > > >> > > > > >> > implementation
> > > >> > > > > >> > > > as
> > > >> > > > > >> > > > > > part
> > > >> > > > > >> > > > > > > >> of
> > > >> > > > > >> > > > > > > >> > > this
> > > >> > > > > >> > > > > > > >> > > > > KIP. However, it is possible to
> add
> > > them
> > > >> > > later
> > > >> > > > as
> > > >> > > > > >> part
> > > >> > > > > >> > > of
> > > >> > > > > >> > > > > > > another
> > > >> > > > > >> > > > > > > >> KIP
> > > >> > > > > >> > > > > > > >> > > if
> > > >> > > > > >> > > > > > > >> > > > > there are good use-cases.
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > *Reasoning.* We did not have
> > concrete
> > > >> > > use-cases
> > > >> > > > > >> that
> > > >> > > > > >> > > > > justified
> > > >> > > > > >> > > > > > > >> more
> > > >> > > > > >> > > > > > > >> > > > methods
> > > >> > > > > >> > > > > > > >> > > > > at this point. Some of the
> use-cases
> > > >> were
> > > >> > for
> > > >> > > > > more
> > > >> > > > > >> > > > > fine-grain
> > > >> > > > > >> > > > > > > >> latency
> > > >> > > > > >> > > > > > > >> > > > > collection, which could be done
> with
> > > >> Kafka
> > > >> > > > > Metrics.
> > > >> > > > > >> > > > Another
> > > >> > > > > >> > > > > > > >> use-case
> > > >> > > > > >> > > > > > > >> > > was
> > > >> > > > > >> > > > > > > >> > > > > encryption. However, there are
> > several
> > > >> > design
> > > >> > > > > >> options
> > > >> > > > > >> > > for
> > > >> > > > > >> > > > > > > >> encryption.
> > > >> > > > > >> > > > > > > >> > > One
> > > >> > > > > >> > > > > > > >> > > > > is to do per-record encryption
> which
> > > >> would
> > > >> > > > > require
> > > >> > > > > >> > > adding
> > > >> > > > > >> > > > > > > >> > > > > ProducerInterceptor.onEnqueued()
> and
> > > >> > > > > >> > > > > > > >> ConsumerInterceptor.onReceive().
> > > >> > > > > >> > > > > > > >> > > One
> > > >> > > > > >> > > > > > > >> > > > > could argue that in that case
> > > encryption
> > > >> > > could
> > > >> > > > be
> > > >> > > > > >> done
> > > >> > > > > >> > > by
> > > >> > > > > >> > > > > > > adding a
> > > >> > > > > >> > > > > > > >> > > custom
> > > >> > > > > >> > > > > > > >> > > > > serializer/deserializer. Another
> > > option
> > > >> is
> > > >> > to
> > > >> > > > do
> > > >> > > > > >> > > > encryption
> > > >> > > > > >> > > > > > > after
> > > >> > > > > >> > > > > > > >> > > message
> > > >> > > > > >> > > > > > > >> > > > > gets compressed, but there are
> > issues
> > > >> that
> > > >> > > > arise
> > > >> > > > > >> > > regarding
> > > >> > > > > >> > > > > > > broker
> > > >> > > > > >> > > > > > > >> > doing
> > > >> > > > > >> > > > > > > >> > > > > re-compression. We decided that it
> > is
> > > >> > better
> > > >> > > to
> > > >> > > > > >> have
> > > >> > > > > >> > > that
> > > >> > > > > >> > > > > > > >> discussion
> > > >> > > > > >> > > > > > > >> > > in a
> > > >> > > > > >> > > > > > > >> > > > > separate KIP and decide that this
> is
> > > >> > > something
> > > >> > > > we
> > > >> > > > > >> want
> > > >> > > > > >> > > to
> > > >> > > > > >> > > > do
> > > >> > > > > >> > > > > > > with
> > > >> > > > > >> > > > > > > >> > > > > interceptors or by other means.
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > Todd, Mayuresh and others who
> missed
> > > the
> > > >> > KIP
> > > >> > > > > >> meeting,
> > > >> > > > > >> > > > please
> > > >> > > > > >> > > > > > let
> > > >> > > > > >> > > > > > > >> me
> > > >> > > > > >> > > > > > > >> > > know
> > > >> > > > > >> > > > > > > >> > > > > your thoughts on the scope we
> agreed
> > > on
> > > >> > > during
> > > >> > > > > the
> > > >> > > > > >> > > > meeting.
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > I will update the KIP proposal
> with
> > > the
> > > >> > > current
> > > >> > > > > >> > decision
> > > >> > > > > >> > > > by
> > > >> > > > > >> > > > > > end
> > > >> > > > > >> > > > > > > of
> > > >> > > > > >> > > > > > > >> > > today.
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > Anna
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM,
> > > >> Mayuresh
> > > >> > > > > Gharat <
> > > >> > > > > >> > > > > > > >> > > > > gharatmayures...@gmail.com>
> wrote:
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > > > > Hi,
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > I won't be able to make it to
> KIP
> > > >> hangout
> > > >> > > due
> > > >> > > > > to
> > > >> > > > > >> > > > conflict.
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > Anna, here is the use case where
> > > >> knowing
> > > >> > if
> > > >> > > > > there
> > > >> > > > > >> > are
> > > >> > > > > >> > > > > > messages
> > > >> > > > > >> > > > > > > >> in
> > > >> > > > > >> > > > > > > >> > the
> > > >> > > > > >> > > > > > > >> > > > > > RecordAccumulator left to be
> sent
> > to
> > > >> the
> > > >> > > > kafka
> > > >> > > > > >> > cluster
> > > >> > > > > >> > > > > for a
> > > >> > > > > >> > > > > > > >> topic
> > > >> > > > > >> > > > > > > >> > is
> > > >> > > > > >> > > > > > > >> > > > > > useful.
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > 1) Consider a pipeline :
> > > >> > > > > >> > > > > > > >> > > > > > A ---> Mirror-maker -----> B
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > 2) We have a topic T in cluster
> A
> > > >> > mirrored
> > > >> > > to
> > > >> > > > > >> > cluster
> > > >> > > > > >> > > B.
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > 3) Now if we delete topic T in A
> > and
> > > >> > > > > immediately
> > > >> > > > > >> > > proceed
> > > >> > > > > >> > > > > to
> > > >> > > > > >> > > > > > > >> delete
> > > >> > > > > >> > > > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > > > > > topic in cluster B, some of the
> > the
> > > >> > > > > Mirror-maker
> > > >> > > > > >> > > > machines
> > > >> > > > > >> > > > > > die
> > > >> > > > > >> > > > > > > >> > because
> > > >> > > > > >> > > > > > > >> > > > > > atleast one of the batches in
> > > >> > > > RecordAccumulator
> > > >> > > > > >> for
> > > >> > > > > >> > > > topic
> > > >> > > > > >> > > > > T
> > > >> > > > > >> > > > > > > >> fail to
> > > >> > > > > >> > > > > > > >> > > be
> > > >> > > > > >> > > > > > > >> > > > > > produced to cluster B. We have
> > seen
> > > >> this
> > > >> > > > > >> happening
> > > >> > > > > >> > in
> > > >> > > > > >> > > > our
> > > >> > > > > >> > > > > > > >> clusters.
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > If we know that there are no
> more
> > > >> > messages
> > > >> > > > left
> > > >> > > > > >> in
> > > >> > > > > >> > the
> > > >> > > > > >> > > > > > > >> > > > RecordAccumulator
> > > >> > > > > >> > > > > > > >> > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > be produced to cluster B, we can
> > > >> safely
> > > >> > > > delete
> > > >> > > > > >> the
> > > >> > > > > >> > > topic
> > > >> > > > > >> > > > > in
> > > >> > > > > >> > > > > > > >> > cluster B
> > > >> > > > > >> > > > > > > >> > > > > > without disturbing the pipeline.
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > Mayuresh
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31
> AM,
> > > Anna
> > > >> > > > Povzner
> > > >> > > > > <
> > > >> > > > > >> > > > > > > >> a...@confluent.io>
> > > >> > > > > >> > > > > > > >> > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > Thanks Ismael and Todd for
> your
> > > >> > feedback!
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > I agree about coming up with
> > lean,
> > > >> but
> > > >> > > > useful
> > > >> > > > > >> > > > interfaces
> > > >> > > > > >> > > > > > > that
> > > >> > > > > >> > > > > > > >> > will
> > > >> > > > > >> > > > > > > >> > > be
> > > >> > > > > >> > > > > > > >> > > > > > easy
> > > >> > > > > >> > > > > > > >> > > > > > > to extend later.
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > When we discuss the minimal
> set
> > of
> > > >> > > producer
> > > >> > > > > and
> > > >> > > > > >> > > > consumer
> > > >> > > > > >> > > > > > > >> > > interceptor
> > > >> > > > > >> > > > > > > >> > > > > API
> > > >> > > > > >> > > > > > > >> > > > > > in
> > > >> > > > > >> > > > > > > >> > > > > > > today’s KIP meeting
> (discussion
> > > >> item #2
> > > >> > > in
> > > >> > > > my
> > > >> > > > > >> > > previous
> > > >> > > > > >> > > > > > > email),
> > > >> > > > > >> > > > > > > >> > lets
> > > >> > > > > >> > > > > > > >> > > > > > compare
> > > >> > > > > >> > > > > > > >> > > > > > > two options:
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > *1. Minimal set of immutable
> API
> > > for
> > > >> > > > producer
> > > >> > > > > >> and
> > > >> > > > > >> > > > > consumer
> > > >> > > > > >> > > > > > > >> > > > > interceptors*
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > > > public void
> > > >> onSend(ProducerRecord<K, V>
> > > >> > > > > >> record);
> > > >> > > > > >> > > > > > > >> > > > > > > public void
> > > >> > > > onAcknowledgement(RecordMetadata
> > > >> > > > > >> > > metadata,
> > > >> > > > > >> > > > > > > >> Exception
> > > >> > > > > >> > > > > > > >> > > > > > > exception);
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > > > public void
> > > >> > onConsume(ConsumerRecords<K,
> > > >> > > V>
> > > >> > > > > >> > > records);
> > > >> > > > > >> > > > > > > >> > > > > > > public void
> > > >> > onCommit(Map<TopicPartition,
> > > >> > > > > >> > > > > > OffsetAndMetadata>
> > > >> > > > > >> > > > > > > >> > > offsets);
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > Use-cases:
> > > >> > > > > >> > > > > > > >> > > > > > > — end-to-end monitoring;
> custom
> > > >> tracing
> > > >> > > and
> > > >> > > > > >> > logging
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > *2. Minimal set of mutable API
> > for
> > > >> > > producer
> > > >> > > > > and
> > > >> > > > > >> > > > consumer
> > > >> > > > > >> > > > > > > >> > > > interceptors*
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > > > ProducerRecord<K, V>
> > > >> > > > onSend(ProducerRecord<K,
> > > >> > > > > >> V>
> > > >> > > > > >> > > > > record);
> > > >> > > > > >> > > > > > > >> > > > > > > void
> > > >> onAcknowledgement(RecordMetadata
> > > >> > > > > metadata,
> > > >> > > > > >> > > > > Exception
> > > >> > > > > >> > > > > > > >> > > exception);
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > > > void
> > onConsume(ConsumerRecords<K,
> > > V>
> > > >> > > > > records);
> > > >> > > > > >> > > > > > > >> > > > > > > void
> > onCommit(Map<TopicPartition,
> > > >> > > > > >> > OffsetAndMetadata>
> > > >> > > > > >> > > > > > > offsets);
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > Additional use-cases to #1:
> > > >> > > > > >> > > > > > > >> > > > > > > — Ability to add metadata to a
> > > >> message
> > > >> > or
> > > >> > > > > fill
> > > >> > > > > >> in
> > > >> > > > > >> > > > > standard
> > > >> > > > > >> > > > > > > >> fields
> > > >> > > > > >> > > > > > > >> > > for
> > > >> > > > > >> > > > > > > >> > > > > > audit
> > > >> > > > > >> > > > > > > >> > > > > > > and routing.
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > Implications
> > > >> > > > > >> > > > > > > >> > > > > > > — Partition assignment will be
> > > done
> > > >> > based
> > > >> > > > on
> > > >> > > > > >> > > modified
> > > >> > > > > >> > > > > > > >> key/value
> > > >> > > > > >> > > > > > > >> > > > instead
> > > >> > > > > >> > > > > > > >> > > > > > of
> > > >> > > > > >> > > > > > > >> > > > > > > original key/value. If
> key/value
> > > >> > > > > >> transformation is
> > > >> > > > > >> > > not
> > > >> > > > > >> > > > > > > >> consistent
> > > >> > > > > >> > > > > > > >> > > > (same
> > > >> > > > > >> > > > > > > >> > > > > > key
> > > >> > > > > >> > > > > > > >> > > > > > > and value does not mutate to
> the
> > > >> same,
> > > >> > > but
> > > >> > > > > >> > modified,
> > > >> > > > > >> > > > > > > >> key/value),
> > > >> > > > > >> > > > > > > >> > > then
> > > >> > > > > >> > > > > > > >> > > > > log
> > > >> > > > > >> > > > > > > >> > > > > > > compaction would not work.
> > > However,
> > > >> > audit
> > > >> > > > and
> > > >> > > > > >> > > routing
> > > >> > > > > >> > > > > > > >> use-cases
> > > >> > > > > >> > > > > > > >> > > from
> > > >> > > > > >> > > > > > > >> > > > > Todd
> > > >> > > > > >> > > > > > > >> > > > > > > will likely do consistent
> > > >> > transformation.
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > *Additional callbacks
> > (discussion
> > > >> item
> > > >> > #3
> > > >> > > > in
> > > >> > > > > my
> > > >> > > > > >> > > > previous
> > > >> > > > > >> > > > > > > >> email):*
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > If we want to support
> > encryption,
> > > we
> > > >> > > would
> > > >> > > > > >> want to
> > > >> > > > > >> > > be
> > > >> > > > > >> > > > > able
> > > >> > > > > >> > > > > > > to
> > > >> > > > > >> > > > > > > >> > > modify
> > > >> > > > > >> > > > > > > >> > > > > > > serialized key/value, rather
> > than
> > > >> key
> > > >> > and
> > > >> > > > > value
> > > >> > > > > >> > > > objects.
> > > >> > > > > >> > > > > > > This
> > > >> > > > > >> > > > > > > >> > will
> > > >> > > > > >> > > > > > > >> > > > add
> > > >> > > > > >> > > > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > following API to producer and
> > > >> consumer
> > > >> > > > > >> > interceptors:
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > ProducerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > > > SerializedKeyValue
> > > >> > > > onEnqueued(TopicPartition
> > > >> > > > > >> tp,
> > > >> > > > > >> > > > > > > >> > ProducerRecord<K,
> > > >> > > > > >> > > > > > > >> > > V>
> > > >> > > > > >> > > > > > > >> > > > > > > record, SerializedKeyValue
> > > >> > > > > serializedKeyValue);
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > ConsumerInterceptor:
> > > >> > > > > >> > > > > > > >> > > > > > > SerializedKeyValue
> > > >> > > onReceive(TopicPartition
> > > >> > > > > tp,
> > > >> > > > > >> > > > > > > >> > SerializedKeyValue
> > > >> > > > > >> > > > > > > >> > > > > > > serializedKeyValue);
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > I am leaning towards
> > implementing
> > > >> the
> > > >> > > > minimal
> > > >> > > > > >> set
> > > >> > > > > >> > of
> > > >> > > > > >> > > > > > > >> immutable or
> > > >> > > > > >> > > > > > > >> > > > > mutable
> > > >> > > > > >> > > > > > > >> > > > > > > interfaces, making sure that
> we
> > > >> have a
> > > >> > > > > >> > compatibility
> > > >> > > > > >> > > > > plan
> > > >> > > > > >> > > > > > > that
> > > >> > > > > >> > > > > > > >> > > allows
> > > >> > > > > >> > > > > > > >> > > > > us
> > > >> > > > > >> > > > > > > >> > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > add more callbacks in the
> future
> > > >> (per
> > > >> > > > Ismael
> > > >> > > > > >> > > comment),
> > > >> > > > > >> > > > > and
> > > >> > > > > >> > > > > > > add
> > > >> > > > > >> > > > > > > >> > more
> > > >> > > > > >> > > > > > > >> > > > > APIs
> > > >> > > > > >> > > > > > > >> > > > > > > later. E.g., for encryption
> > > >> use-case,
> > > >> > > there
> > > >> > > > > >> could
> > > >> > > > > >> > be
> > > >> > > > > >> > > > an
> > > >> > > > > >> > > > > > > >> argument
> > > >> > > > > >> > > > > > > >> > in
> > > >> > > > > >> > > > > > > >> > > > > doing
> > > >> > > > > >> > > > > > > >> > > > > > > encryption after message
> > > compression
> > > >> > vs.
> > > >> > > > > >> > per-record
> > > >> > > > > >> > > > > > > encryption
> > > >> > > > > >> > > > > > > >> > that
> > > >> > > > > >> > > > > > > >> > > > > could
> > > >> > > > > >> > > > > > > >> > > > > > > be done using the above
> > additional
> > > >> API.
> > > >> > > > There
> > > >> > > > > >> is
> > > >> > > > > >> > > also
> > > >> > > > > >> > > > > more
> > > >> > > > > >> > > > > > > >> > > > implications
> > > >> > > > > >> > > > > > > >> > > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > every API that modifies
> records:
> > > >> > > modifying
> > > >> > > > > >> > > serialized
> > > >> > > > > >> > > > > > > >> key/value
> > > >> > > > > >> > > > > > > >> > > will
> > > >> > > > > >> > > > > > > >> > > > > > again
> > > >> > > > > >> > > > > > > >> > > > > > > impact partition assignment
> (we
> > > will
> > > >> > > likely
> > > >> > > > > do
> > > >> > > > > >> > that
> > > >> > > > > >> > > > > after
> > > >> > > > > >> > > > > > > >> > partition
> > > >> > > > > >> > > > > > > >> > > > > > > assignment), which may impact
> > log
> > > >> > > > compaction
> > > >> > > > > >> and
> > > >> > > > > >> > > > mirror
> > > >> > > > > >> > > > > > > maker
> > > >> > > > > >> > > > > > > >> > > > > > partitioning.
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > Anna
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26
> AM,
> > > >> Todd
> > > >> > > > Palino
> > > >> > > > > <
> > > >> > > > > >> > > > > > > >> tpal...@gmail.com>
> > > >> > > > > >> > > > > > > >> > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > Finally got a chance to
> take a
> > > >> look
> > > >> > at
> > > >> > > > > this.
> > > >> > > > > >> I
> > > >> > > > > >> > > won’t
> > > >> > > > > >> > > > > be
> > > >> > > > > >> > > > > > > >> able to
> > > >> > > > > >> > > > > > > >> > > > make
> > > >> > > > > >> > > > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > KIP meeting due to a
> conflict.
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > I’m somewhat disappointed in
> > > this
> > > >> > > > > proposal. I
> > > >> > > > > >> > > think
> > > >> > > > > >> > > > > that
> > > >> > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > explicit
> > > >> > > > > >> > > > > > > >> > > > > > > > exclusion of modification of
> > the
> > > >> > > messages
> > > >> > > > > is
> > > >> > > > > >> > > > > > > short-sighted,
> > > >> > > > > >> > > > > > > >> and
> > > >> > > > > >> > > > > > > >> > > not
> > > >> > > > > >> > > > > > > >> > > > > > > > accounting for it now is
> going
> > > to
> > > >> > bite
> > > >> > > us
> > > >> > > > > >> later.
> > > >> > > > > >> > > > Jay,
> > > >> > > > > >> > > > > > > aren’t
> > > >> > > > > >> > > > > > > >> > you
> > > >> > > > > >> > > > > > > >> > > > the
> > > >> > > > > >> > > > > > > >> > > > > > one
> > > >> > > > > >> > > > > > > >> > > > > > > > railing against public
> > > interfaces
> > > >> and
> > > >> > > how
> > > >> > > > > >> > > difficult
> > > >> > > > > >> > > > > they
> > > >> > > > > >> > > > > > > >> are to
> > > >> > > > > >> > > > > > > >> > > > work
> > > >> > > > > >> > > > > > > >> > > > > > with
> > > >> > > > > >> > > > > > > >> > > > > > > > when you don’t get them
> right?
> > > The
> > > >> > > > “simple”
> > > >> > > > > >> > change
> > > >> > > > > >> > > > to
> > > >> > > > > >> > > > > > one
> > > >> > > > > >> > > > > > > of
> > > >> > > > > >> > > > > > > >> > > these
> > > >> > > > > >> > > > > > > >> > > > > > > > interfaces to make it able
> to
> > > >> return
> > > >> > a
> > > >> > > > > >> record is
> > > >> > > > > >> > > > going
> > > >> > > > > >> > > > > > to
> > > >> > > > > >> > > > > > > >> be a
> > > >> > > > > >> > > > > > > >> > > > > > > significant
> > > >> > > > > >> > > > > > > >> > > > > > > > change and is going to
> require
> > > all
> > > >> > > > clients
> > > >> > > > > to
> > > >> > > > > >> > > > rewrite
> > > >> > > > > >> > > > > > > their
> > > >> > > > > >> > > > > > > >> > > > > > interceptors.
> > > >> > > > > >> > > > > > > >> > > > > > > > If we’re not willing to put
> > the
> > > >> time
> > > >> > to
> > > >> > > > > think
> > > >> > > > > >> > > > through
> > > >> > > > > >> > > > > > > >> > > manipulation
> > > >> > > > > >> > > > > > > >> > > > > now,
> > > >> > > > > >> > > > > > > >> > > > > > > > then this KIP should be
> > shelved
> > > >> until
> > > >> > > we
> > > >> > > > > are.
> > > >> > > > > >> > > > > > Implementing
> > > >> > > > > >> > > > > > > >> > > > something
> > > >> > > > > >> > > > > > > >> > > > > > > > halfway is going to be worse
> > > than
> > > >> > > taking
> > > >> > > > a
> > > >> > > > > >> > little
> > > >> > > > > >> > > > > > longer.
> > > >> > > > > >> > > > > > > In
> > > >> > > > > >> > > > > > > >> > > > > addition,
> > > >> > > > > >> > > > > > > >> > > > > > I
> > > >> > > > > >> > > > > > > >> > > > > > > > don’t believe that
> > manipulation
> > > >> > > requires
> > > >> > > > > >> > anything
> > > >> > > > > >> > > > more
> > > >> > > > > >> > > > > > > than
> > > >> > > > > >> > > > > > > >> > > > > > interceptors
> > > >> > > > > >> > > > > > > >> > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > receive the full record, and
> > > then
> > > >> to
> > > >> > > > return
> > > >> > > > > >> it.
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > There are 3 use case I can
> > think
> > > >> of
> > > >> > > right
> > > >> > > > > now
> > > >> > > > > >> > > > without
> > > >> > > > > >> > > > > > any
> > > >> > > > > >> > > > > > > >> deep
> > > >> > > > > >> > > > > > > >> > > > > > discussion
> > > >> > > > > >> > > > > > > >> > > > > > > > that can make use of
> > > interceptors
> > > >> > with
> > > >> > > > > >> > > modification:
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > 1. Auditing. The ability to
> > add
> > > >> > > metadata
> > > >> > > > > to a
> > > >> > > > > >> > > > message
> > > >> > > > > >> > > > > > for
> > > >> > > > > >> > > > > > > >> > > auditing
> > > >> > > > > >> > > > > > > >> > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > critical. Hostname, service
> > > name,
> > > >> > > > > timestamps,
> > > >> > > > > >> > etc.
> > > >> > > > > >> > > > are
> > > >> > > > > >> > > > > > all
> > > >> > > > > >> > > > > > > >> > pieces
> > > >> > > > > >> > > > > > > >> > > > of
> > > >> > > > > >> > > > > > > >> > > > > > data
> > > >> > > > > >> > > > > > > >> > > > > > > > that can be used on the
> other
> > > >> side of
> > > >> > > the
> > > >> > > > > >> > pipeline
> > > >> > > > > >> > > > to
> > > >> > > > > >> > > > > > > >> > categorize
> > > >> > > > > >> > > > > > > >> > > > > > > messages,
> > > >> > > > > >> > > > > > > >> > > > > > > > determine loss and transport
> > > time,
> > > >> > and
> > > >> > > > pin
> > > >> > > > > >> down
> > > >> > > > > >> > > > > issues.
> > > >> > > > > >> > > > > > > You
> > > >> > > > > >> > > > > > > >> may
> > > >> > > > > >> > > > > > > >> > > say
> > > >> > > > > >> > > > > > > >> > > > > > that
> > > >> > > > > >> > > > > > > >> > > > > > > > these things can just be
> part
> > of
> > > >> the
> > > >> > > > > message
> > > >> > > > > >> > > schema,
> > > >> > > > > >> > > > > but
> > > >> > > > > >> > > > > > > >> anyone
> > > >> > > > > >> > > > > > > >> > > who
> > > >> > > > > >> > > > > > > >> > > > > has
> > > >> > > > > >> > > > > > > >> > > > > > > > worked with a multi-user
> data
> > > >> system
> > > >> > > > > >> (especially
> > > >> > > > > >> > > > those
> > > >> > > > > >> > > > > > who
> > > >> > > > > >> > > > > > > >> have
> > > >> > > > > >> > > > > > > >> > > > been
> > > >> > > > > >> > > > > > > >> > > > > > > > involved with LinkedIn) know
> > how
> > > >> > > > difficult
> > > >> > > > > >> it is
> > > >> > > > > >> > > to
> > > >> > > > > >> > > > > > > maintain
> > > >> > > > > >> > > > > > > >> > > > > consistent
> > > >> > > > > >> > > > > > > >> > > > > > > > message schemas and to get
> > other
> > > >> > people
> > > >> > > > to
> > > >> > > > > >> put
> > > >> > > > > >> > in
> > > >> > > > > >> > > > > fields
> > > >> > > > > >> > > > > > > for
> > > >> > > > > >> > > > > > > >> > your
> > > >> > > > > >> > > > > > > >> > > > > use.
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > 2. Encryption. This is
> > probably
> > > >> the
> > > >> > > most
> > > >> > > > > >> obvious
> > > >> > > > > >> > > > case
> > > >> > > > > >> > > > > > for
> > > >> > > > > >> > > > > > > >> > record
> > > >> > > > > >> > > > > > > >> > > > > > > > manipulation on both sides.
> > The
> > > >> > ability
> > > >> > > > to
> > > >> > > > > >> tie
> > > >> > > > > >> > in
> > > >> > > > > >> > > > end
> > > >> > > > > >> > > > > to
> > > >> > > > > >> > > > > > > end
> > > >> > > > > >> > > > > > > >> > > > > encryption
> > > >> > > > > >> > > > > > > >> > > > > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > important for data that
> > requires
> > > >> > > external
> > > >> > > > > >> > > compliance
> > > >> > > > > >> > > > > > (PCI,
> > > >> > > > > >> > > > > > > >> > HIPAA,
> > > >> > > > > >> > > > > > > >> > > > > > etc.).
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > 3. Routing. By being able to
> > > add a
> > > >> > bit
> > > >> > > of
> > > >> > > > > >> > > > information
> > > >> > > > > >> > > > > > > about
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > > > > source
> > > >> > > > > >> > > > > > > >> > > > > > or
> > > >> > > > > >> > > > > > > >> > > > > > > > destination of a message to
> > the
> > > >> > > metadata,
> > > >> > > > > you
> > > >> > > > > >> > can
> > > >> > > > > >> > > > > easily
> > > >> > > > > >> > > > > > > >> > > construct
> > > >> > > > > >> > > > > > > >> > > > an
> > > >> > > > > >> > > > > > > >> > > > > > > > intelligent mirror maker
> that
> > > can
> > > >> > > prevent
> > > >> > > > > >> loops.
> > > >> > > > > >> > > > This
> > > >> > > > > >> > > > > > has
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > > > > > opportunity
> > > >> > > > > >> > > > > > > >> > > > > > > > to result in significant
> > > >> operational
> > > >> > > > > >> savings, as
> > > >> > > > > >> > > you
> > > >> > > > > >> > > > > can
> > > >> > > > > >> > > > > > > get
> > > >> > > > > >> > > > > > > >> > rid
> > > >> > > > > >> > > > > > > >> > > of
> > > >> > > > > >> > > > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > need for tiered clusters in
> > > order
> > > >> to
> > > >> > > > > prevent
> > > >> > > > > >> > loops
> > > >> > > > > >> > > > in
> > > >> > > > > >> > > > > > > >> mirroring
> > > >> > > > > >> > > > > > > >> > > > > > messages.
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > All three of these share the
> > > >> feature
> > > >> > > that
> > > >> > > > > >> they
> > > >> > > > > >> > add
> > > >> > > > > >> > > > > > > metadata
> > > >> > > > > >> > > > > > > >> to
> > > >> > > > > >> > > > > > > >> > > > > > messages.
> > > >> > > > > >> > > > > > > >> > > > > > > > With the pushback on having
> > > >> arbitrary
> > > >> > > > > >> metadata
> > > >> > > > > >> > as
> > > >> > > > > >> > > an
> > > >> > > > > >> > > > > > > >> “envelope”
> > > >> > > > > >> > > > > > > >> > > to
> > > >> > > > > >> > > > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > message, this is a way to
> > > provide
> > > >> it
> > > >> > > and
> > > >> > > > > >> make it
> > > >> > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > > responsibility
> > > >> > > > > >> > > > > > > >> > > > > of
> > > >> > > > > >> > > > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > client, and not the Kafka
> > broker
> > > >> and
> > > >> > > > system
> > > >> > > > > >> > > itself.
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > -Todd
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30
> > AM,
> > > >> > Ismael
> > > >> > > > > Juma
> > > >> > > > > >> <
> > > >> > > > > >> > > > > > > >> > ism...@juma.me.uk>
> > > >> > > > > >> > > > > > > >> > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > Hi Anna and Neha,
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > I think it makes a lot of
> > > sense
> > > >> to
> > > >> > > try
> > > >> > > > > and
> > > >> > > > > >> > keep
> > > >> > > > > >> > > > the
> > > >> > > > > >> > > > > > > >> interface
> > > >> > > > > >> > > > > > > >> > > > lean
> > > >> > > > > >> > > > > > > >> > > > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > add more methods later
> > when/if
> > > >> > there
> > > >> > > > is a
> > > >> > > > > >> > need.
> > > >> > > > > >> > > > What
> > > >> > > > > >> > > > > > is
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > > > current
> > > >> > > > > >> > > > > > > >> > > > > > > > > thinking with regards to
> > > >> > > compatibility
> > > >> > > > > >> when/if
> > > >> > > > > >> > > we
> > > >> > > > > >> > > > > add
> > > >> > > > > >> > > > > > > new
> > > >> > > > > >> > > > > > > >> > > > methods?
> > > >> > > > > >> > > > > > > >> > > > > A
> > > >> > > > > >> > > > > > > >> > > > > > > few
> > > >> > > > > >> > > > > > > >> > > > > > > > > options come to mind:
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > 1. Change the interface to
> > an
> > > >> > > abstract
> > > >> > > > > >> class
> > > >> > > > > >> > > with
> > > >> > > > > >> > > > > > empty
> > > >> > > > > >> > > > > > > >> > > > > > implementations
> > > >> > > > > >> > > > > > > >> > > > > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > > > all the methods. This
> means
> > > that
> > > >> > the
> > > >> > > > path
> > > >> > > > > >> to
> > > >> > > > > >> > > > adding
> > > >> > > > > >> > > > > > new
> > > >> > > > > >> > > > > > > >> > methods
> > > >> > > > > >> > > > > > > >> > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > clear.
> > > >> > > > > >> > > > > > > >> > > > > > > > > 2. Hope we have moved to
> > Java
> > > 8
> > > >> by
> > > >> > > the
> > > >> > > > > >> time we
> > > >> > > > > >> > > > need
> > > >> > > > > >> > > > > to
> > > >> > > > > >> > > > > > > add
> > > >> > > > > >> > > > > > > >> > new
> > > >> > > > > >> > > > > > > >> > > > > > methods
> > > >> > > > > >> > > > > > > >> > > > > > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > > > use default methods with
> an
> > > >> empty
> > > >> > > > > >> > implementation
> > > >> > > > > >> > > > for
> > > >> > > > > >> > > > > > any
> > > >> > > > > >> > > > > > > >> new
> > > >> > > > > >> > > > > > > >> > > > method
> > > >> > > > > >> > > > > > > >> > > > > > > (and
> > > >> > > > > >> > > > > > > >> > > > > > > > > potentially make existing
> > > >> methods
> > > >> > > > default
> > > >> > > > > >> > > methods
> > > >> > > > > >> > > > > too
> > > >> > > > > >> > > > > > at
> > > >> > > > > >> > > > > > > >> that
> > > >> > > > > >> > > > > > > >> > > > point
> > > >> > > > > >> > > > > > > >> > > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > > > consistency)
> > > >> > > > > >> > > > > > > >> > > > > > > > > 3. Introduce a new
> interface
> > > >> that
> > > >> > > > > inherits
> > > >> > > > > >> > from
> > > >> > > > > >> > > > the
> > > >> > > > > >> > > > > > > >> existing
> > > >> > > > > >> > > > > > > >> > > > > > > Interceptor
> > > >> > > > > >> > > > > > > >> > > > > > > > > interface when we need to
> > add
> > > >> new
> > > >> > > > > methods.
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > Option 1 is the easiest
> and
> > it
> > > >> also
> > > >> > > > means
> > > >> > > > > >> that
> > > >> > > > > >> > > > > > > interceptor
> > > >> > > > > >> > > > > > > >> > > users
> > > >> > > > > >> > > > > > > >> > > > > only
> > > >> > > > > >> > > > > > > >> > > > > > > > need
> > > >> > > > > >> > > > > > > >> > > > > > > > > to override the methods
> that
> > > >> they
> > > >> > are
> > > >> > > > > >> > interested
> > > >> > > > > >> > > > > (more
> > > >> > > > > >> > > > > > > >> useful
> > > >> > > > > >> > > > > > > >> > > if
> > > >> > > > > >> > > > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > number
> > > >> > > > > >> > > > > > > >> > > > > > > > > of methods grows). The
> > > downside
> > > >> is
> > > >> > > that
> > > >> > > > > >> > > > interceptor
> > > >> > > > > >> > > > > > > >> > > > implementations
> > > >> > > > > >> > > > > > > >> > > > > > > > cannot
> > > >> > > > > >> > > > > > > >> > > > > > > > > inherit from another class
> > (a
> > > >> > > > > >> straightforward
> > > >> > > > > >> > > > > > workaround
> > > >> > > > > >> > > > > > > >> is
> > > >> > > > > >> > > > > > > >> > to
> > > >> > > > > >> > > > > > > >> > > > make
> > > >> > > > > >> > > > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > interceptor a forwarder
> that
> > > >> calls
> > > >> > > > > another
> > > >> > > > > >> > > class).
> > > >> > > > > >> > > > > > Also,
> > > >> > > > > >> > > > > > > >> our
> > > >> > > > > >> > > > > > > >> > > > > existing
> > > >> > > > > >> > > > > > > >> > > > > > > > > callbacks are interfaces,
> so
> > > >> seems
> > > >> > a
> > > >> > > > bit
> > > >> > > > > >> > > > > inconsistent.
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > Option 2 may be the most
> > > >> appealing
> > > >> > > one
> > > >> > > > as
> > > >> > > > > >> both
> > > >> > > > > >> > > > users
> > > >> > > > > >> > > > > > and
> > > >> > > > > >> > > > > > > >> > > > ourselves
> > > >> > > > > >> > > > > > > >> > > > > > > retain
> > > >> > > > > >> > > > > > > >> > > > > > > > > flexibility. The main
> > downside
> > > >> is
> > > >> > > that
> > > >> > > > it
> > > >> > > > > >> > relies
> > > >> > > > > >> > > > on
> > > >> > > > > >> > > > > us
> > > >> > > > > >> > > > > > > >> moving
> > > >> > > > > >> > > > > > > >> > > to
> > > >> > > > > >> > > > > > > >> > > > > Java
> > > >> > > > > >> > > > > > > >> > > > > > > 8,
> > > >> > > > > >> > > > > > > >> > > > > > > > > which may be more than a
> > year
> > > >> away
> > > >> > > > > >> potentially
> > > >> > > > > >> > > (if
> > > >> > > > > >> > > > > we
> > > >> > > > > >> > > > > > > >> support
> > > >> > > > > >> > > > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > > > > > last
> > > >> > > > > >> > > > > > > >> > > > > > > 2
> > > >> > > > > >> > > > > > > >> > > > > > > > > Java releases).
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > Thoughts?
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > Ismael
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at
> 4:59
> > > AM,
> > > >> > Neha
> > > >> > > > > >> > Narkhede <
> > > >> > > > > >> > > > > > > >> > > > n...@confluent.io>
> > > >> > > > > >> > > > > > > >> > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Anna,
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > I'm also in favor of
> > > including
> > > >> > just
> > > >> > > > the
> > > >> > > > > >> APIs
> > > >> > > > > >> > > for
> > > >> > > > > >> > > > > > which
> > > >> > > > > >> > > > > > > >> we
> > > >> > > > > >> > > > > > > >> > > have
> > > >> > > > > >> > > > > > > >> > > > a
> > > >> > > > > >> > > > > > > >> > > > > > > clear
> > > >> > > > > >> > > > > > > >> > > > > > > > > use
> > > >> > > > > >> > > > > > > >> > > > > > > > > > case. If more use cases
> > for
> > > >> finer
> > > >> > > > > >> monitoring
> > > >> > > > > >> > > > show
> > > >> > > > > >> > > > > up
> > > >> > > > > >> > > > > > > in
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > > > > future,
> > > >> > > > > >> > > > > > > >> > > > > > > we
> > > >> > > > > >> > > > > > > >> > > > > > > > > can
> > > >> > > > > >> > > > > > > >> > > > > > > > > > always update the
> > interface.
> > > >> > Would
> > > >> > > > you
> > > >> > > > > >> > please
> > > >> > > > > >> > > > > > > highlight
> > > >> > > > > >> > > > > > > >> in
> > > >> > > > > >> > > > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > > > > KIP
> > > >> > > > > >> > > > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > APIs
> > > >> > > > > >> > > > > > > >> > > > > > > > > > that you think we have
> an
> > > >> > immediate
> > > >> > > > use
> > > >> > > > > >> for?
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Joel,
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Broker-side monitoring
> > > makes a
> > > >> > lot
> > > >> > > of
> > > >> > > > > >> sense
> > > >> > > > > >> > in
> > > >> > > > > >> > > > the
> > > >> > > > > >> > > > > > > long
> > > >> > > > > >> > > > > > > >> > term
> > > >> > > > > >> > > > > > > >> > > > > > though I
> > > >> > > > > >> > > > > > > >> > > > > > > > > don't
> > > >> > > > > >> > > > > > > >> > > > > > > > > > think it is a
> requirement
> > > for
> > > >> > > > > end-to-end
> > > >> > > > > >> > > > > monitoring.
> > > >> > > > > >> > > > > > > >> With
> > > >> > > > > >> > > > > > > >> > the
> > > >> > > > > >> > > > > > > >> > > > > > > producer
> > > >> > > > > >> > > > > > > >> > > > > > > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > > > > consumer interceptors,
> you
> > > >> have
> > > >> > the
> > > >> > > > > >> ability
> > > >> > > > > >> > to
> > > >> > > > > >> > > > get
> > > >> > > > > >> > > > > > > full
> > > >> > > > > >> > > > > > > >> > > > > > > > > > publish-to-subscribe
> > > >> end-to-end
> > > >> > > > > >> monitoring.
> > > >> > > > > >> > > The
> > > >> > > > > >> > > > > > broker
> > > >> > > > > >> > > > > > > >> > > > > interceptor
> > > >> > > > > >> > > > > > > >> > > > > > > > > > certainly improves the
> > > >> resolution
> > > >> > > of
> > > >> > > > > >> > > monitoring
> > > >> > > > > >> > > > > but
> > > >> > > > > >> > > > > > it
> > > >> > > > > >> > > > > > > >> is
> > > >> > > > > >> > > > > > > >> > > also
> > > >> > > > > >> > > > > > > >> > > > a
> > > >> > > > > >> > > > > > > >> > > > > > > > riskier
> > > >> > > > > >> > > > > > > >> > > > > > > > > > change. I prefer an
> > > >> incremental
> > > >> > > > > approach
> > > >> > > > > >> > over
> > > >> > > > > >> > > a
> > > >> > > > > >> > > > > > > big-bang
> > > >> > > > > >> > > > > > > >> > and
> > > >> > > > > >> > > > > > > >> > > > > > > recommend
> > > >> > > > > >> > > > > > > >> > > > > > > > > > taking baby-steps. Let's
> > > first
> > > >> > make
> > > >> > > > > sure
> > > >> > > > > >> the
> > > >> > > > > >> > > > > > > >> > > producer/consumer
> > > >> > > > > >> > > > > > > >> > > > > > > > > interceptors
> > > >> > > > > >> > > > > > > >> > > > > > > > > > are successful. And then
> > > come
> > > >> > back
> > > >> > > > and
> > > >> > > > > >> add
> > > >> > > > > >> > the
> > > >> > > > > >> > > > > > broker
> > > >> > > > > >> > > > > > > >> > > > interceptor
> > > >> > > > > >> > > > > > > >> > > > > > > > > > carefully.
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Having said that, it
> would
> > > be
> > > >> > great
> > > >> > > > to
> > > >> > > > > >> > > > understand
> > > >> > > > > >> > > > > > your
> > > >> > > > > >> > > > > > > >> > > proposal
> > > >> > > > > >> > > > > > > >> > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > broker interceptor
> > > >> independently.
> > > >> > > We
> > > >> > > > > can
> > > >> > > > > >> > > either
> > > >> > > > > >> > > > > add
> > > >> > > > > >> > > > > > an
> > > >> > > > > >> > > > > > > >> > > > > interceptor
> > > >> > > > > >> > > > > > > >> > > > > > > > > > on-append or on-commit.
> If
> > > >> people
> > > >> > > > want
> > > >> > > > > to
> > > >> > > > > >> > use
> > > >> > > > > >> > > > this
> > > >> > > > > >> > > > > > for
> > > >> > > > > >> > > > > > > >> > > > > monitoring,
> > > >> > > > > >> > > > > > > >> > > > > > > then
> > > >> > > > > >> > > > > > > >> > > > > > > > > > possibly on-commit might
> > be
> > > >> more
> > > >> > > > > useful?
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Neha
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at
> > 6:47
> > > >> PM,
> > > >> > > Jay
> > > >> > > > > >> Kreps <
> > > >> > > > > >> > > > > > > >> > j...@confluent.io
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > Hey Joel,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > What is the interface
> > you
> > > >> are
> > > >> > > > > thinking
> > > >> > > > > >> of?
> > > >> > > > > >> > > > > > Something
> > > >> > > > > >> > > > > > > >> like
> > > >> > > > > >> > > > > > > >> > > > this:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >     onAppend(String
> > topic,
> > > >> int
> > > >> > > > > >> partition,
> > > >> > > > > >> > > > > Records
> > > >> > > > > >> > > > > > > >> > records,
> > > >> > > > > >> > > > > > > >> > > > long
> > > >> > > > > >> > > > > > > >> > > > > > > time)
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > ?
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > One challenge right
> now
> > is
> > > >> that
> > > >> > > we
> > > >> > > > > are
> > > >> > > > > >> > still
> > > >> > > > > >> > > > > using
> > > >> > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > old
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > Message/MessageSet
> > classes
> > > >> on
> > > >> > the
> > > >> > > > > >> broker
> > > >> > > > > >> > > which
> > > >> > > > > >> > > > > I'm
> > > >> > > > > >> > > > > > > not
> > > >> > > > > >> > > > > > > >> > sure
> > > >> > > > > >> > > > > > > >> > > > if
> > > >> > > > > >> > > > > > > >> > > > > > we'd
> > > >> > > > > >> > > > > > > >> > > > > > > > > want
> > > >> > > > > >> > > > > > > >> > > > > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > support over the long
> > haul
> > > >> but
> > > >> > it
> > > >> > > > > >> might be
> > > >> > > > > >> > > > okay
> > > >> > > > > >> > > > > > just
> > > >> > > > > >> > > > > > > >> to
> > > >> > > > > >> > > > > > > >> > > > create
> > > >> > > > > >> > > > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > records
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > instance for this
> > > interface.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > -Jay
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016
> at
> > > >> 12:37
> > > >> > PM,
> > > >> > > > > Joel
> > > >> > > > > >> > > Koshy <
> > > >> > > > > >> > > > > > > >> > > > > > jjkosh...@gmail.com>
> > > >> > > > > >> > > > > > > >> > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > I'm definitely in
> > favor
> > > of
> > > >> > > having
> > > >> > > > > >> such
> > > >> > > > > >> > > hooks
> > > >> > > > > >> > > > > in
> > > >> > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > produce/consume
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > life-cycle. Not sure
> > if
> > > >> > people
> > > >> > > > > >> remember
> > > >> > > > > >> > > this
> > > >> > > > > >> > > > > but
> > > >> > > > > >> > > > > > > in
> > > >> > > > > >> > > > > > > >> > Kafka
> > > >> > > > > >> > > > > > > >> > > > 0.7
> > > >> > > > > >> > > > > > > >> > > > > > > this
> > > >> > > > > >> > > > > > > >> > > > > > > > > was
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > pretty much how it
> > was:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > >
> > > >> > > > > >> > >
> > > >> > > > > >> >
> > > >> > > > > >>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > i.e., we had
> something
> > > >> > similar
> > > >> > > to
> > > >> > > > > the
> > > >> > > > > >> > > > > > interceptor
> > > >> > > > > >> > > > > > > >> > > proposal
> > > >> > > > > >> > > > > > > >> > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > > > various
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > stages of the
> producer
> > > >> > request.
> > > >> > > > The
> > > >> > > > > >> > > producer
> > > >> > > > > >> > > > > > > >> provided
> > > >> > > > > >> > > > > > > >> > > > > > call-backs
> > > >> > > > > >> > > > > > > >> > > > > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > beforeEnqueue,
> > > >> afterEnqueue,
> > > >> > > > > >> > > afterDequeuing,
> > > >> > > > > >> > > > > > > >> > > beforeSending,
> > > >> > > > > >> > > > > > > >> > > > > > etc.
> > > >> > > > > >> > > > > > > >> > > > > > > So
> > > >> > > > > >> > > > > > > >> > > > > > > > > at
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > LinkedIn we in fact
> > did
> > > >> > > auditing
> > > >> > > > > >> within
> > > >> > > > > >> > > > these
> > > >> > > > > >> > > > > > > >> > call-backs
> > > >> > > > > >> > > > > > > >> > > > (and
> > > >> > > > > >> > > > > > > >> > > > > > not
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > explicitly in the
> > > >> wrapper).
> > > >> > > Over
> > > >> > > > > time
> > > >> > > > > >> > and
> > > >> > > > > >> > > > with
> > > >> > > > > >> > > > > > 0.8
> > > >> > > > > >> > > > > > > >> we
> > > >> > > > > >> > > > > > > >> > > moved
> > > >> > > > > >> > > > > > > >> > > > > > that
> > > >> > > > > >> > > > > > > >> > > > > > > > out
> > > >> > > > > >> > > > > > > >> > > > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > wrapper libraries.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > On a side-note while
> > > audit
> > > >> > and
> > > >> > > > > other
> > > >> > > > > >> > > > > monitoring
> > > >> > > > > >> > > > > > > can
> > > >> > > > > >> > > > > > > >> be
> > > >> > > > > >> > > > > > > >> > > done
> > > >> > > > > >> > > > > > > >> > > > > > > > > internally
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > in a
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > convenient way I
> think
> > > it
> > > >> > > should
> > > >> > > > be
> > > >> > > > > >> > > > clarified
> > > >> > > > > >> > > > > > that
> > > >> > > > > >> > > > > > > >> > > having a
> > > >> > > > > >> > > > > > > >> > > > > > > wrapper
> > > >> > > > > >> > > > > > > >> > > > > > > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > > > in
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > general not a bad
> idea
> > > >> and I
> > > >> > > > would
> > > >> > > > > >> even
> > > >> > > > > >> > > > > consider
> > > >> > > > > >> > > > > > > it
> > > >> > > > > >> > > > > > > >> to
> > > >> > > > > >> > > > > > > >> > > be a
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > best-practice.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > Even with 0.7 we
> still
> > > >> had a
> > > >> > > > > wrapper
> > > >> > > > > >> > > library
> > > >> > > > > >> > > > > and
> > > >> > > > > >> > > > > > > >> that
> > > >> > > > > >> > > > > > > >> > API
> > > >> > > > > >> > > > > > > >> > > > has
> > > >> > > > > >> > > > > > > >> > > > > > > > largely
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > stayed the same and
> > has
> > > >> > helped
> > > >> > > > > >> protect
> > > >> > > > > >> > > > against
> > > >> > > > > >> > > > > > > >> > (sometimes
> > > >> > > > > >> > > > > > > >> > > > > > > backwards
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > incompatible)
> changes
> > in
> > > >> open
> > > >> > > > > source.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > While we are on this
> > > >> topic I
> > > >> > > have
> > > >> > > > > one
> > > >> > > > > >> > > > comment
> > > >> > > > > >> > > > > > and
> > > >> > > > > >> > > > > > > >> Anna,
> > > >> > > > > >> > > > > > > >> > > you
> > > >> > > > > >> > > > > > > >> > > > > may
> > > >> > > > > >> > > > > > > >> > > > > > > > have
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > already considered
> > this
> > > >> but I
> > > >> > > > don't
> > > >> > > > > >> see
> > > >> > > > > >> > > > > mention
> > > >> > > > > >> > > > > > of
> > > >> > > > > >> > > > > > > >> it
> > > >> > > > > >> > > > > > > >> > in
> > > >> > > > > >> > > > > > > >> > > > the
> > > >> > > > > >> > > > > > > >> > > > > > KIP:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > Add a custom message
> > > >> > > > > >> > interceptor/validator
> > > >> > > > > >> > > > on
> > > >> > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > >> > broker
> > > >> > > > > >> > > > > > > >> > > on
> > > >> > > > > >> > > > > > > >> > > > > > > message
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > arrival.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > We decompress and do
> > > basic
> > > >> > > > > >> validation of
> > > >> > > > > >> > > > > > messages
> > > >> > > > > >> > > > > > > on
> > > >> > > > > >> > > > > > > >> > > > > arrival. I
> > > >> > > > > >> > > > > > > >> > > > > > > > think
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > there
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > is value in
> supporting
> > > >> custom
> > > >> > > > > >> validation
> > > >> > > > > >> > > and
> > > >> > > > > >> > > > > > > expand
> > > >> > > > > >> > > > > > > >> it
> > > >> > > > > >> > > > > > > >> > to
> > > >> > > > > >> > > > > > > >> > > > > > support
> > > >> > > > > >> > > > > > > >> > > > > > > > > > custom
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > on-arrival
> processing.
> > > >> Here
> > > >> > is
> > > >> > > a
> > > >> > > > > >> > specific
> > > >> > > > > >> > > > > > > use-case I
> > > >> > > > > >> > > > > > > >> > have
> > > >> > > > > >> > > > > > > >> > > > in
> > > >> > > > > >> > > > > > > >> > > > > > > mind.
> > > >> > > > > >> > > > > > > >> > > > > > > > > The
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > blog
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > that James
> referenced
> > > >> > describes
> > > >> > > > our
> > > >> > > > > >> > > auditing
> > > >> > > > > >> > > > > > > >> > > > infrastructure.
> > > >> > > > > >> > > > > > > >> > > > > In
> > > >> > > > > >> > > > > > > >> > > > > > > > order
> > > >> > > > > >> > > > > > > >> > > > > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > audit the Kafka
> > cluster
> > > >> > itself
> > > >> > > we
> > > >> > > > > >> need
> > > >> > > > > >> > to
> > > >> > > > > >> > > > run
> > > >> > > > > >> > > > > a
> > > >> > > > > >> > > > > > > >> > "console
> > > >> > > > > >> > > > > > > >> > > > > > auditor"
> > > >> > > > > >> > > > > > > >> > > > > > > > > > service
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > that consumes
> > everything
> > > >> and
> > > >> > > > spits
> > > >> > > > > >> out
> > > >> > > > > >> > > audit
> > > >> > > > > >> > > > > > > events
> > > >> > > > > >> > > > > > > >> > back
> > > >> > > > > >> > > > > > > >> > > to
> > > >> > > > > >> > > > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > cluster.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > I
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > would prefer not
> > having
> > > to
> > > >> > run
> > > >> > > > this
> > > >> > > > > >> > > service
> > > >> > > > > >> > > > > > > because:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >    - Well, it is one
> > > more
> > > >> > > service
> > > >> > > > > >> that
> > > >> > > > > >> > we
> > > >> > > > > >> > > > have
> > > >> > > > > >> > > > > > to
> > > >> > > > > >> > > > > > > >> run
> > > >> > > > > >> > > > > > > >> > and
> > > >> > > > > >> > > > > > > >> > > > > > monitor
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >    - Consuming
> > > everything
> > > >> > takes
> > > >> > > > up
> > > >> > > > > >> > > bandwidth
> > > >> > > > > >> > > > > > which
> > > >> > > > > >> > > > > > > >> can
> > > >> > > > > >> > > > > > > >> > be
> > > >> > > > > >> > > > > > > >> > > > > > avoided
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >    - The console
> > auditor
> > > >> > > consumer
> > > >> > > > > >> itself
> > > >> > > > > >> > > can
> > > >> > > > > >> > > > > lag
> > > >> > > > > >> > > > > > > and
> > > >> > > > > >> > > > > > > >> > > cause
> > > >> > > > > >> > > > > > > >> > > > > > > > temporary
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > audit
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >    discrepancies
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > One way we can
> > mitigate
> > > >> this
> > > >> > is
> > > >> > > > by
> > > >> > > > > >> > having
> > > >> > > > > >> > > > > > > >> mirror-makers
> > > >> > > > > >> > > > > > > >> > > in
> > > >> > > > > >> > > > > > > >> > > > > > > between
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > clusters
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > emit audit events.
> The
> > > >> > problem
> > > >> > > is
> > > >> > > > > >> that
> > > >> > > > > >> > the
> > > >> > > > > >> > > > > very
> > > >> > > > > >> > > > > > > last
> > > >> > > > > >> > > > > > > >> > > > cluster
> > > >> > > > > >> > > > > > > >> > > > > in
> > > >> > > > > >> > > > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > pipeline will not
> have
> > > any
> > > >> > > audit
> > > >> > > > > >> which
> > > >> > > > > >> > is
> > > >> > > > > >> > > > why
> > > >> > > > > >> > > > > we
> > > >> > > > > >> > > > > > > >> need
> > > >> > > > > >> > > > > > > >> > to
> > > >> > > > > >> > > > > > > >> > > > have
> > > >> > > > > >> > > > > > > >> > > > > > > > > something
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > audit the cluster.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > If we had a custom
> > > message
> > > >> > > > > validator
> > > >> > > > > >> > then
> > > >> > > > > >> > > > the
> > > >> > > > > >> > > > > > > audit
> > > >> > > > > >> > > > > > > >> can
> > > >> > > > > >> > > > > > > >> > > be
> > > >> > > > > >> > > > > > > >> > > > > done
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > on-arrival
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > and we won't need a
> > > >> console
> > > >> > > > > auditor.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > One potential issue
> in
> > > >> this
> > > >> > > > > approach
> > > >> > > > > >> and
> > > >> > > > > >> > > any
> > > >> > > > > >> > > > > > > >> elaborate
> > > >> > > > > >> > > > > > > >> > > > > > on-arrival
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > processing for that
> > > >> matter is
> > > >> > > > that
> > > >> > > > > >> you
> > > >> > > > > >> > may
> > > >> > > > > >> > > > > need
> > > >> > > > > >> > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > deserialize
> > > >> > > > > >> > > > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > message
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > as well which can
> > drive
> > > up
> > > >> > > > produce
> > > >> > > > > >> > request
> > > >> > > > > >> > > > > > > handling
> > > >> > > > > >> > > > > > > >> > > times.
> > > >> > > > > >> > > > > > > >> > > > > > > However
> > > >> > > > > >> > > > > > > >> > > > > > > > > I'm
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > not
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > terribly concerned
> > about
> > > >> that
> > > >> > > > > >> especially
> > > >> > > > > >> > > if
> > > >> > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> audit
> > > >> > > > > >> > > > > > > >> > > > header
> > > >> > > > > >> > > > > > > >> > > > > > can
> > > >> > > > > >> > > > > > > >> > > > > > > be
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > separated out easily
> > or
> > > >> even
> > > >> > > > > >> > deserialized
> > > >> > > > > >> > > > > > > partially
> > > >> > > > > >> > > > > > > >> as
> > > >> > > > > >> > > > > > > >> > > this
> > > >> > > > > >> > > > > > > >> > > > > > Avro
> > > >> > > > > >> > > > > > > >> > > > > > > > > thread
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > touches on
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > >
> > > >> > > > > >> > >
> > > >> > > > > >> >
> > > >> > > > > >>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > Joel
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016
> > at
> > > >> 12:02
> > > >> > > PM,
> > > >> > > > > >> > Mayuresh
> > > >> > > > > >> > > > > > Gharat
> > > >> > > > > >> > > > > > > <
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> gharatmayures...@gmail.com>
> > > >> > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > Nice KIP.
> Excellent
> > > >> idea.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > Was just thinking
> if
> > > we
> > > >> can
> > > >> > > add
> > > >> > > > > >> > > onDequed()
> > > >> > > > > >> > > > > to
> > > >> > > > > >> > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > ProducerIterceptor
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > interface. Since
> we
> > > have
> > > >> > the
> > > >> > > > > >> > > onEnqueued(),
> > > >> > > > > >> > > > > it
> > > >> > > > > >> > > > > > > will
> > > >> > > > > >> > > > > > > >> > help
> > > >> > > > > >> > > > > > > >> > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > client
> > > >> > > > > >> > > > > > > >> > > > > > > > > or
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > tools to know how
> > much
> > > >> time
> > > >> > > the
> > > >> > > > > >> > message
> > > >> > > > > >> > > > > spent
> > > >> > > > > >> > > > > > in
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > RecordAccumulator.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > Also an API to
> check
> > > if
> > > >> > there
> > > >> > > > are
> > > >> > > > > >> any
> > > >> > > > > >> > > > > messages
> > > >> > > > > >> > > > > > > >> left
> > > >> > > > > >> > > > > > > >> > > for a
> > > >> > > > > >> > > > > > > >> > > > > > > > > particular
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > topic
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > in the
> > > RecordAccumulator
> > > >> > > would
> > > >> > > > > >> help.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > Mayuresh
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25,
> 2016
> > > at
> > > >> > 11:29
> > > >> > > > AM,
> > > >> > > > > >> Todd
> > > >> > > > > >> > > > > Palino
> > > >> > > > > >> > > > > > <
> > > >> > > > > >> > > > > > > >> > > > > > > tpal...@gmail.com
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve
> > > been
> > > >> > > talking
> > > >> > > > > >> about
> > > >> > > > > >> > > this
> > > >> > > > > >> > > > > > for 2
> > > >> > > > > >> > > > > > > >> > years,
> > > >> > > > > >> > > > > > > >> > > > and
> > > >> > > > > >> > > > > > > >> > > > > > I’m
> > > >> > > > > >> > > > > > > >> > > > > > > > > glad
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > someone
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > is finally
> picking
> > > it
> > > >> up.
> > > >> > > > Will
> > > >> > > > > >> take
> > > >> > > > > >> > a
> > > >> > > > > >> > > > look
> > > >> > > > > >> > > > > > at
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > KIP
> > > >> > > > > >> > > > > > > >> > > > at
> > > >> > > > > >> > > > > > > >> > > > > > some
> > > >> > > > > >> > > > > > > >> > > > > > > > > point
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > shortly.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > -Todd
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25,
> > 2016
> > > >> at
> > > >> > > 11:24
> > > >> > > > > AM,
> > > >> > > > > >> > Jay
> > > >> > > > > >> > > > > Kreps
> > > >> > > > > >> > > > > > <
> > > >> > > > > >> > > > > > > >> > > > > > > j...@confluent.io>
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > Hey Becket,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > Yeah this is
> > > really
> > > >> > > similar
> > > >> > > > > to
> > > >> > > > > >> the
> > > >> > > > > >> > > > > > callback.
> > > >> > > > > >> > > > > > > >> The
> > > >> > > > > >> > > > > > > >> > > > > > difference
> > > >> > > > > >> > > > > > > >> > > > > > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > really
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > in
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > who sets the
> > > >> behavior.
> > > >> > > The
> > > >> > > > > >> idea of
> > > >> > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > interceptor
> > > >> > > > > >> > > > > > > >> > > is
> > > >> > > > > >> > > > > > > >> > > > > > that
> > > >> > > > > >> > > > > > > >> > > > > > > it
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > doesn't
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > require any
> code
> > > >> change
> > > >> > > in
> > > >> > > > > >> apps so
> > > >> > > > > >> > > you
> > > >> > > > > >> > > > > can
> > > >> > > > > >> > > > > > > >> > globally
> > > >> > > > > >> > > > > > > >> > > > add
> > > >> > > > > >> > > > > > > >> > > > > > > > > behavior
> > > >> > > > > >> > > > > > > >> > > > > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > your
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > Kafka usage
> > > without
> > > >> > > > changing
> > > >> > > > > >> app
> > > >> > > > > >> > > code.
> > > >> > > > > >> > > > > > > Whereas
> > > >> > > > > >> > > > > > > >> > the
> > > >> > > > > >> > > > > > > >> > > > > > callback
> > > >> > > > > >> > > > > > > >> > > > > > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > added
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > by
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > app. The idea
> is
> > > to
> > > >> > kind
> > > >> > > of
> > > >> > > > > >> > obviate
> > > >> > > > > >> > > > the
> > > >> > > > > >> > > > > > need
> > > >> > > > > >> > > > > > > >> for
> > > >> > > > > >> > > > > > > >> > > the
> > > >> > > > > >> > > > > > > >> > > > > > > wrapper
> > > >> > > > > >> > > > > > > >> > > > > > > > > code
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > that
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > e.g.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > LinkedIn
> > maintains
> > > >> to
> > > >> > > hold
> > > >> > > > > this
> > > >> > > > > >> > kind
> > > >> > > > > >> > > > of
> > > >> > > > > >> > > > > > > stuff.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > -Jay
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan
> 24,
> > > >> 2016 at
> > > >> > > > 4:21
> > > >> > > > > >> PM,
> > > >> > > > > >> > > > Becket
> > > >> > > > > >> > > > > > Qin
> > > >> > > > > >> > > > > > > <
> > > >> > > > > >> > > > > > > >> > > > > > > > > > becket....@gmail.com>
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > This could
> be
> > a
> > > >> > useful
> > > >> > > > > >> feature.
> > > >> > > > > >> > > And
> > > >> > > > > >> > > > I
> > > >> > > > > >> > > > > > > think
> > > >> > > > > >> > > > > > > >> > there
> > > >> > > > > >> > > > > > > >> > > > are
> > > >> > > > > >> > > > > > > >> > > > > > > some
> > > >> > > > > >> > > > > > > >> > > > > > > > > use
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > cases
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > mutate the
> > data
> > > >> like
> > > >> > > > > rejected
> > > >> > > > > >> > > > > > alternative
> > > >> > > > > >> > > > > > > >> one
> > > >> > > > > >> > > > > > > >> > > > > > mentioned.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > I am
> wondering
> > > if
> > > >> > there
> > > >> > > > is
> > > >> > > > > >> > > > functional
> > > >> > > > > >> > > > > > > >> > overlapping
> > > >> > > > > >> > > > > > > >> > > > > > between
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > ProducerInterceptor.onAcknowledgement()
> > > >> > > > > >> > > > > > > and
> > > >> > > > > >> > > > > > > >> the
> > > >> > > > > >> > > > > > > >> > > > > > producer
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > callback?
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > I
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > can
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > see that the
> > > >> Callback
> > > >> > > > could
> > > >> > > > > >> be a
> > > >> > > > > >> > > per
> > > >> > > > > >> > > > > > > record
> > > >> > > > > >> > > > > > > >> > > setting
> > > >> > > > > >> > > > > > > >> > > > > > while
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> onAcknowledgement()
> > > >> > is
> > > >> > > a
> > > >> > > > > >> > producer
> > > >> > > > > >> > > > > level
> > > >> > > > > >> > > > > > > >> > setting.
> > > >> > > > > >> > > > > > > >> > > > > Other
> > > >> > > > > >> > > > > > > >> > > > > > > than
> > > >> > > > > >> > > > > > > >> > > > > > > > > > that,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > there
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > any
> difference
> > > >> > between
> > > >> > > > > them?
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > Jiangjie
> > > (Becket)
> > > >> Qin
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan
> > 22,
> > > >> 2016
> > > >> > at
> > > >> > > > > 6:21
> > > >> > > > > >> PM,
> > > >> > > > > >> > > > Neha
> > > >> > > > > >> > > > > > > >> Narkhede
> > > >> > > > > >> > > > > > > >> > <
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > n...@confluent.io>
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > James,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > That is
> one
> > of
> > > >> the
> > > >> > > many
> > > >> > > > > >> > > monitoring
> > > >> > > > > >> > > > > use
> > > >> > > > > >> > > > > > > >> cases
> > > >> > > > > >> > > > > > > >> > > for
> > > >> > > > > >> > > > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > interceptor
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > interface.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > Neha
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > On Fri,
> Jan
> > > 22,
> > > >> > 2016
> > > >> > > at
> > > >> > > > > >> 6:18
> > > >> > > > > >> > PM,
> > > >> > > > > >> > > > > James
> > > >> > > > > >> > > > > > > >> Cheng
> > > >> > > > > >> > > > > > > >> > <
> > > >> > > > > >> > > > > > > >> > > > > > > > > > jch...@tivo.com>
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Anna,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > I'm
> trying
> > > to
> > > >> > > > > understand
> > > >> > > > > >> a
> > > >> > > > > >> > > > > concrete
> > > >> > > > > >> > > > > > > use
> > > >> > > > > >> > > > > > > >> > case.
> > > >> > > > > >> > > > > > > >> > > > It
> > > >> > > > > >> > > > > > > >> > > > > > > sounds
> > > >> > > > > >> > > > > > > >> > > > > > > > > > like
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > producer
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > interceptors
> > > >> > could
> > > >> > > be
> > > >> > > > > >> used
> > > >> > > > > >> > to
> > > >> > > > > >> > > > > > > implement
> > > >> > > > > >> > > > > > > >> > part
> > > >> > > > > >> > > > > > > >> > > of
> > > >> > > > > >> > > > > > > >> > > > > > > > > LinkedIn's
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > Kafak
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > Audit
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > tool?
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> >
> > https://engineering.linkedin.com/kafka/running-kafka-scale
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Part of
> > that
> > > >> is
> > > >> > > done
> > > >> > > > > by a
> > > >> > > > > >> > > > wrapper
> > > >> > > > > >> > > > > > > >> library
> > > >> > > > > >> > > > > > > >> > > > around
> > > >> > > > > >> > > > > > > >> > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > kafka
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > producer
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > that
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > keeps a
> > > count
> > > >> of
> > > >> > > the
> > > >> > > > > >> number
> > > >> > > > > >> > of
> > > >> > > > > >> > > > > > > messages
> > > >> > > > > >> > > > > > > >> > > > produced,
> > > >> > > > > >> > > > > > > >> > > > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > > > then
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > sends
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > that
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > count
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > to a
> > > >> side-topic.
> > > >> > It
> > > >> > > > > >> sounds
> > > >> > > > > >> > > like
> > > >> > > > > >> > > > > the
> > > >> > > > > >> > > > > > > >> > producer
> > > >> > > > > >> > > > > > > >> > > > > > > > interceptors
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > could
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > possibly
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > be
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > used to
> > > >> implement
> > > >> > > > that?
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > -James
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan
> > 22,
> > > >> > 2016,
> > > >> > > at
> > > >> > > > > >> 4:33
> > > >> > > > > >> > PM,
> > > >> > > > > >> > > > > Anna
> > > >> > > > > >> > > > > > > >> > Povzner <
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > a...@confluent.io
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > wrote:
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Hi,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > I just
> > > >> created
> > > >> > a
> > > >> > > > > KIP-42
> > > >> > > > > >> > for
> > > >> > > > > >> > > > > adding
> > > >> > > > > >> > > > > > > >> > producer
> > > >> > > > > >> > > > > > > >> > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > > > consumer
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > interceptors
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > for
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > intercepting
> > > >> > > > messages
> > > >> > > > > >> at
> > > >> > > > > >> > > > > different
> > > >> > > > > >> > > > > > > >> points
> > > >> > > > > >> > > > > > > >> > > on
> > > >> > > > > >> > > > > > > >> > > > > > > producer
> > > >> > > > > >> > > > > > > >> > > > > > > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > consumer.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > >
> > > >> > > > > >> > >
> > > >> > > > > >> >
> > > >> > > > > >>
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> Comments
> > > and
> > > >> > > > > >> suggestions
> > > >> > > > > >> > are
> > > >> > > > > >> > > > > > > welcome!
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > >
> Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > Anna
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > ________________________________
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > This
> email
> > > and
> > > >> > any
> > > >> > > > > >> > attachments
> > > >> > > > > >> > > > may
> > > >> > > > > >> > > > > > > >> contain
> > > >> > > > > >> > > > > > > >> > > > > > > confidential
> > > >> > > > > >> > > > > > > >> > > > > > > > > and
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > privileged
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > material
> > for
> > > >> the
> > > >> > > sole
> > > >> > > > > >> use of
> > > >> > > > > >> > > the
> > > >> > > > > >> > > > > > > >> intended
> > > >> > > > > >> > > > > > > >> > > > > > recipient.
> > > >> > > > > >> > > > > > > >> > > > > > > > Any
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > review,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > copying,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > or
> > > >> distribution
> > > >> > of
> > > >> > > > this
> > > >> > > > > >> > email
> > > >> > > > > >> > > > (or
> > > >> > > > > >> > > > > > any
> > > >> > > > > >> > > > > > > >> > > > > attachments)
> > > >> > > > > >> > > > > > > >> > > > > > by
> > > >> > > > > >> > > > > > > >> > > > > > > > > > others
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > is
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> prohibited.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > If you
> are
> > > not
> > > >> > the
> > > >> > > > > >> intended
> > > >> > > > > >> > > > > > recipient,
> > > >> > > > > >> > > > > > > >> > please
> > > >> > > > > >> > > > > > > >> > > > > > contact
> > > >> > > > > >> > > > > > > >> > > > > > > > the
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > sender
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > immediately
> > > >> and
> > > >> > > > > >> permanently
> > > >> > > > > >> > > > delete
> > > >> > > > > >> > > > > > > this
> > > >> > > > > >> > > > > > > >> > email
> > > >> > > > > >> > > > > > > >> > > > and
> > > >> > > > > >> > > > > > > >> > > > > > any
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > attachments.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > No
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > employee
> > or
> > > >> agent
> > > >> > > of
> > > >> > > > > TiVo
> > > >> > > > > >> > Inc.
> > > >> > > > > >> > > > is
> > > >> > > > > >> > > > > > > >> > authorized
> > > >> > > > > >> > > > > > > >> > > to
> > > >> > > > > >> > > > > > > >> > > > > > > > conclude
> > > >> > > > > >> > > > > > > >> > > > > > > > > > any
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > binding
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> agreement
> > on
> > > >> > behalf
> > > >> > > > of
> > > >> > > > > >> TiVo
> > > >> > > > > >> > > Inc.
> > > >> > > > > >> > > > > by
> > > >> > > > > >> > > > > > > >> email.
> > > >> > > > > >> > > > > > > >> > > > > Binding
> > > >> > > > > >> > > > > > > >> > > > > > > > > > agreements
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > with
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > TiVo
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > Inc. may
> > > only
> > > >> be
> > > >> > > made
> > > >> > > > > by
> > > >> > > > > >> a
> > > >> > > > > >> > > > signed
> > > >> > > > > >> > > > > > > >> written
> > > >> > > > > >> > > > > > > >> > > > > > agreement.
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > --
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > Neha
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > --
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > *—-*
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > *Todd Palino*
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > Staff Site
> > > Reliability
> > > >> > > > Engineer
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > > Data
> > Infrastructure
> > > >> > > Streaming
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > linkedin.com/in/toddpalino
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > --
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > -Regards,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > Mayuresh R. Gharat
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > > (862) 250-7125
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > > > --
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Thanks,
> > > >> > > > > >> > > > > > > >> > > > > > > > > > Neha
> > > >> > > > > >> > > > > > > >> > > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > --
> > > >> > > > > >> > > > > > > >> > > > > > > > *—-*
> > > >> > > > > >> > > > > > > >> > > > > > > > *Todd Palino*
> > > >> > > > > >> > > > > > > >> > > > > > > > Staff Site Reliability
> > Engineer
> > > >> > > > > >> > > > > > > >> > > > > > > > Data Infrastructure
> Streaming
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > > > linkedin.com/in/toddpalino
> > > >> > > > > >> > > > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > > > --
> > > >> > > > > >> > > > > > > >> > > > > > -Regards,
> > > >> > > > > >> > > > > > > >> > > > > > Mayuresh R. Gharat
> > > >> > > > > >> > > > > > > >> > > > > > (862) 250-7125
> > > >> > > > > >> > > > > > > >> > > > > >
> > > >> > > > > >> > > > > > > >> > > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > --
> > > >> > > > > >> > > > > > > >> > > > *—-*
> > > >> > > > > >> > > > > > > >> > > > *Todd Palino*
> > > >> > > > > >> > > > > > > >> > > > Staff Site Reliability Engineer
> > > >> > > > > >> > > > > > > >> > > > Data Infrastructure Streaming
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > > > linkedin.com/in/toddpalino
> > > >> > > > > >> > > > > > > >> > > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> > > --
> > > >> > > > > >> > > > > > > >> > > -Regards,
> > > >> > > > > >> > > > > > > >> > > Mayuresh R. Gharat
> > > >> > > > > >> > > > > > > >> > > (862) 250-7125
> > > >> > > > > >> > > > > > > >> > >
> > > >> > > > > >> > > > > > > >> >
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >> --
> > > >> > > > > >> > > > > > > >> -Regards,
> > > >> > > > > >> > > > > > > >> Mayuresh R. Gharat
> > > >> > > > > >> > > > > > > >> (862) 250-7125
> > > >> > > > > >> > > > > > > >>
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > > >
> > > >> > > > > >> > > > > > >
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > > > --
> > > >> > > > > >> > > > > > Thanks,
> > > >> > > > > >> > > > > > Neha
> > > >> > > > > >> > > > > >
> > > >> > > > > >> > > > >
> > > >> > > > > >> > > >
> > > >> > > > > >> > >
> > > >> > > > > >> >
> > > >> > > > > >>
> > > >> > > > > >>
> > > >> > > > > >>
> > > >> > > > > >> --
> > > >> > > > > >> *—-*
> > > >> > > > > >> *Todd Palino*
> > > >> > > > > >> Staff Site Reliability Engineer
> > > >> > > > > >> Data Infrastructure Streaming
> > > >> > > > > >>
> > > >> > > > > >>
> > > >> > > > > >>
> > > >> > > > > >> linkedin.com/in/toddpalino
> > > >> > > > > >>
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > --
> > > >> > > Thanks,
> > > >> > > Neha
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>



-- 
Thanks,
Neha

Reply via email to