Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-30 Thread Neha Narkhede
Nice finding on the CRC class. It will be great to switch to that at some
point.

On exposing the CRC - I think we are overthinking the problem. CRC over the
entire record makes sense for the durability check and having multiple CRCs
is a bad idea.

On Fri, Jan 29, 2016 at 4:47 PM, Jay Kreps  wrote:

> The rationale for the CRC covering the whole record was to check corruption
> in the full record contents as corruption there will equally prevent
> someone trying to consume the data. I think you could argue either way but
> let's definitely not end up with two different CRC calculations, that would
> just be indecisive.
>
> That is a super interesting finding about the CRC class. I think you are
> right that it became an intrinsic in java 8 with hw support if available.
> We should really switch to that. The CRC calculation is still one of the
> biggest bottlenecks in the producer and consumer so that should
> significantly speed things up.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 4:30 PM, Becket Qin  wrote:
>
> > Hi Anna,
> >
> > I think there is value if CRC for only user bytes can be used. This will
> > help when we have future protocol updates. Otherwise any protocol
> migration
> > might break auditing if it largely relies on CRC including system bytes.
> >
> > I did some test to understand the performance overhead of having a
> separate
> > user bytes CRC,
> >
> > On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for
> > each megabyte it takes 0.47 ms.
> > To compare with compression cost, I used the compressor used by producer.
> > The test uses GZIP and compress 1MB of data for each batch.
> > It takes:
> > ~90 seconds to compress 10GB bytes that contains all same byte.
> > ~470 seconds to compress 10GB bytes containing random bytes.
> > >1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns.
> (I
> > guess it takes time to update the pattern mapping)
> >
> > So the overhead of a separate CRC computing is <1% considering the
> producer
> > do a lot of things other than compression.
> >
> > I am not sure if this overhead is worth taking or not, because we do
> have a
> > huge number of messages to send, so any overhead should be avoided if
> > possible.
> >
> > BTW. Another interesting finding is that the Crc32 class used in clients
> > which used to be faster than java CRC32 in Java 1.6. It seems no longer
> the
> > case. What I see is that Java CRC32 class is 2x faster than the Crc32
> class
> > we are using now.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Jan 29, 2016 at 1:46 PM, Anna Povzner  wrote:
> >
> > > Joel, thanks for your feedback. I updated the wiki based on your
> comments
> > > about the wiki writeup.
> > >
> > >
> > > On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner 
> > wrote:
> > >
> > > > Becket,
> > > >
> > > > In your scenario with one message from producer A and one message
> from
> > > > producer B, those are two different messages, and they should be
> > tracked
> > > as
> > > > two different messages. So I would argue for using record CRC -- CRC
> > that
> > > > is actually used by the system + it will not require computing a
> > > different
> > > > CRC again which will add performance overhead.
> > > >
> > > > If the broker ever changes the CRC, the scenarios when that happens
> > > should
> > > > be very well defined. As far as I know, the scenarios when CRC is
> > > > overwritten by the broker (including KIP-31/32 changes):
> > > > -- topic config is LogAppendTime for timestamp type
> > > > -- upgrade/downgrade
> > > > -- compression codec change (which could be inferred from config).
> > > >
> > > > Monitoring/audit just needs to know when CRCs are safe to use, which
> is
> > > > most often is known from config. In the future, this can be further
> > > > addressed by broker interceptors.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin 
> > > wrote:
> > > >
> > > >> Neha,
> > > >>
> > > >> CRC is definitely an important type of metadata of a record. I am
> not
> > > >> arguing about that. But I think we should distinguish between two
> > types
> > > of
> > > >> checksum here, 1) the checksum of user data. and 2) the checksum
> > > including
> > > >> system appended bytes.
> > > >>
> > > >> I completely agree that (1) is good to add. But I am not sure if we
> > > should
> > > >> expose (2) to user, because this means any underlying protocol
> change
> > > will
> > > >> give a different CRC for exact same message. For example, let's say
> > > >> producer A is sending message with timestamp. Producer B is sending
> > > >> message
> > > >> without timestamp. Even they are given the exact same message, the
> CRC
> > > >> returned would be different.
> > > >>
> > > >> Also, Kafka broker will modify the system appended bytes in
> different
> > > >> 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Neha Narkhede
Becket,

Is your concern the presence of CRC in the RecordMetadata or do you want to
brainstorm how CRC can be used for auditing? I think we shouldn't try to
think about the various ways that people can do monitoring using
interceptors and the metadata we provide. The entire point of having
pluggable interceptors is so that people can employ their own creative
mechanisms to make use of interceptors.

I do think that it is worth discussing whether or not CRC makes sense as
record metadata to the user. My take is that the CRC is the best size-bound
summary of serialized record content available to us which is expensive to
recompute if the user were to redo it. I'd argue this summary of a record
qualifies as its metadata. After all, we use the record CRC for a very
important test of the system durability as it travels through the system.

1. Isn't the TopicPartition + Offset already uniquely identified a message?
> It seems better than CRC no matter from summary point of view or auditing
> point of view.


The offset is a system-assigned value of uniqueness to the message. If you
trusted the system that much, you are not looking to monitor it out-of-band
:-)


> 2. Currently CRC only has 4 bytes. So it will have collision when there are
> more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
> trillion messages per day. So there will be at least a couple of hundreds
> collision for each CRC code every day, whereas TopicPartition+Offset will
> not have any collision.


The CRC isn't sent over the wire and doesn't add any extra overhead in
processing, so what is your concern? If you aren't convinced about its
usefulness, you can always use the default do-nothing interceptor at
LinkedIn and ignore the CRC.

Without having
> the entire message bytes, they may not be able to verify its correctness,
> and the CRC could even be invalid if the broker ever overwritten any field
> or did format conversion.
>

This doesn't make sense to me. The CRC is used for the most important
durability check by Kafka - to verify that the message was not garbled over
the wire. The system can't change it; it has to match on the consumer side
or we will not return it to the user.

On Fri, Jan 29, 2016 at 3:23 AM, Becket Qin  wrote:

> Anna,
>
> It is still not clear to me why we should expose CRC to end user.
> Followings are my confusions.
>
> 1. Isn't the TopicPartition + Offset already uniquely identified a message?
> It seems better than CRC no matter from summary point of view or auditing
> point of view.
>
> 2. Currently CRC only has 4 bytes. So it will have collision when there are
> more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
> trillion messages per day. So there will be at least a couple of hundreds
> collision for each CRC code every day, whereas TopicPartition+Offset will
> not have any collision.
>
> 3. CRC is calculated after all the fields have been filled in by producer,
> including timestamp, attributes, etc. It might also get recomputed on
> broker side. So if users only get CRC from record metadata. Without having
> the entire message bytes, they may not be able to verify its correctness,
> and the CRC could even be invalid if the broker ever overwritten any field
> or did format conversion.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Thu, Jan 28, 2016 at 5:58 PM, Anna Povzner  wrote:
>
> > On a second thought, yes, I think we should expose record size that
> > represents application bytes. This is Becket's option #1.
> >
> > I updated the KIP wiki with new fields in RecordMetadata and
> > ConsumerRecord.
> >
> > I would like to start a voting thread tomorrow if there are no objections
> > or more things to discuss.
> >
> > Thanks,
> > Anna
> >
> >
> >
> > On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner  wrote:
> >
> > > Regarding record size as bytes sent over the wire, my concern is that
> it
> > > is almost impossible to calculate per-record. We could do as: 1)
> > compressed
> > > bytes / number of records in a compressed message, as Todd mentioned;
> or
> > 2)
> > > or same as #1 but take it proportional to uncompressed record size vs.
> > > total uncompressed size of records. All of these calculations give us
> an
> > > estimate. So maybe record size as bytes sent over the wire is not a
> > > per-record metadata, but rather per topic/partition measure that is
> > better
> > > to be exposed through metrics?
> > >
> > >
> > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino 
> wrote:
> > >
> > >> It may be difficult (or nearly impossible) to get actual compressed
> > bytes
> > >> for a message from a compressed batch, but I do think it’s useful
> > >> information to have available for the very reason noted, bandwidth
> > >> consumed. Does it make sense to have an interceptor at the batch level
> > >> that
> > >> can provide this? The other option is to estimate it (such as making
> an
> > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Becket Qin
Anna,

It is still not clear to me why we should expose CRC to end user.
Followings are my confusions.

1. Isn't the TopicPartition + Offset already uniquely identified a message?
It seems better than CRC no matter from summary point of view or auditing
point of view.

2. Currently CRC only has 4 bytes. So it will have collision when there are
more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
trillion messages per day. So there will be at least a couple of hundreds
collision for each CRC code every day, whereas TopicPartition+Offset will
not have any collision.

3. CRC is calculated after all the fields have been filled in by producer,
including timestamp, attributes, etc. It might also get recomputed on
broker side. So if users only get CRC from record metadata. Without having
the entire message bytes, they may not be able to verify its correctness,
and the CRC could even be invalid if the broker ever overwritten any field
or did format conversion.

Thanks,

Jiangjie (Becket) Qin




On Thu, Jan 28, 2016 at 5:58 PM, Anna Povzner  wrote:

> On a second thought, yes, I think we should expose record size that
> represents application bytes. This is Becket's option #1.
>
> I updated the KIP wiki with new fields in RecordMetadata and
> ConsumerRecord.
>
> I would like to start a voting thread tomorrow if there are no objections
> or more things to discuss.
>
> Thanks,
> Anna
>
>
>
> On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner  wrote:
>
> > Regarding record size as bytes sent over the wire, my concern is that it
> > is almost impossible to calculate per-record. We could do as: 1)
> compressed
> > bytes / number of records in a compressed message, as Todd mentioned; or
> 2)
> > or same as #1 but take it proportional to uncompressed record size vs.
> > total uncompressed size of records. All of these calculations give us an
> > estimate. So maybe record size as bytes sent over the wire is not a
> > per-record metadata, but rather per topic/partition measure that is
> better
> > to be exposed through metrics?
> >
> >
> > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino  wrote:
> >
> >> It may be difficult (or nearly impossible) to get actual compressed
> bytes
> >> for a message from a compressed batch, but I do think it’s useful
> >> information to have available for the very reason noted, bandwidth
> >> consumed. Does it make sense to have an interceptor at the batch level
> >> that
> >> can provide this? The other option is to estimate it (such as making an
> >> assumption that the messages in a batch are equal in size, which is not
> >> necessarily true), which is probably not the right answer.
> >>
> >> -Todd
> >>
> >>
> >> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner 
> wrote:
> >>
> >> > Hi Becket,
> >> >
> >> > It will be up to the interceptor to implement their audit or
> monitoring
> >> > strategy. I would also imagine there is more than one good way to do
> >> audit.
> >> > So, I agree that some of the interceptors may not use CRC, and we will
> >> not
> >> > require it. The question is now whether intercepting CRCs is needed. I
> >> > think they are very useful for monitoring and audit, because CRC
> >> provides
> >> > an a easy way to get a summary of a message, rather than using message
> >> > bytes or key/value objects.
> >> >
> >> > Regarding record size, I agree that bandwidth example was not a good
> >> one. I
> >> > think it would be hard to get actual bytes sent over the wire (your
> #2),
> >> > since multiple records get compressed together and we would need to
> >> decide
> >> > which bytes to account to which record. So I am inclined to only do
> your
> >> > #1. However, it still makes more sense to me just to return record
> size
> >> > including the header, since this is the actual record size.
> >> >
> >> > Thanks,
> >> > Anna
> >> >
> >> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin 
> >> wrote:
> >> >
> >> > > Anna,
> >> > >
> >> > > Using CRC to do end2end auditing might be very costly because you
> will
> >> > need
> >> > > to collect all the CRC from both producer and consumer. And it is
> >> based
> >> > on
> >> > > the assumption that broker does not modify the record.
> >> > > Can you shed some idea on how end to end auditing will be using the
> >> CRC
> >> > > before we decide to expose such low level detail to the end user? It
> >> > would
> >> > > also be helpful if you can compare it with something like sequence
> >> number
> >> > > based auditing.
> >> > >
> >> > > About the record size, one thing worth notice is that the size of
> >> Record
> >> > is
> >> > > not the actual bytes sent over the wire if we use compression. So
> that
> >> > does
> >> > > not really tell user how much bandwidth they are using. Personally I
> >> > think
> >> > > two kinds of size may be useful.
> >> > > 1. The record size after serialization, i.e. application bytes. (The
> >> > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Joel Koshy
Responding to some of the earlier comments in the thread:

@Jay/@Neha,

I think any one of onCommit/onAppend/onArrival would work for the concrete
use-case that I had outlined. I think onArrival is additionally useful for
custom validation - i.e., reject the message and do not append if it
violates some cluster-specific rule (for e.g., if some header timestamp is
older than xyz). However, the thing with user-supplied validation is we
would have to do with a (new) generic error code in the producer response.
While there is a risk of a broker interceptor having high latency I think
that is acceptable since it is the user's responsibility to ensure low
latency - the producer call-back and onAcknowledgment interceptor are
similar in this regard although those are less risky. Even so, I think
there are clear use-cases for broker interceptors so I feel the risk part
is something that just needs to be documented. @Jay that is a good point
about moving from Message/MessageSet to Records although that may be less
difficult to absorb since it is a broker-side interceptor and so people
don't need to get a ton of applications in their company to switch to use
it.

Re: onEnqueued: monitoring serialization latency can be done via metrics
but this is more useful for recording whether serialization succeeded or
not. onAcknowledgment subsumes this but it also subsumes other possible
errors (such as produce errors). It is more fine-grained than most people
need though (i.e., I don't think we will use it even if it is present.)

Re: checksums: I think it is a good addition to metadata; and for
low-volume or super-critical topics can be used for very strict auditing.

There are a couple of typos/edits for the wiki itself:

   - Under Kafka Producer changes:
   - you have references to KafkaConsumer constructor and
  ConsumerConfig.originals.
  - sendRecord -> sentRecord (may be clearer)
   - Under ProducerInterceptor interface: there is a mention of onEnqueued
   which was rejected
   - Comment for ConsumerRecord.record should probably be: // NEW: record
   size in bytes (*after decompression*)


BTW - Anna, nice work on the KIP!

Joel

On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede  wrote:

> Becket,
>
> Is your concern the presence of CRC in the RecordMetadata or do you want to
> brainstorm how CRC can be used for auditing? I think we shouldn't try to
> think about the various ways that people can do monitoring using
> interceptors and the metadata we provide. The entire point of having
> pluggable interceptors is so that people can employ their own creative
> mechanisms to make use of interceptors.
>
> I do think that it is worth discussing whether or not CRC makes sense as
> record metadata to the user. My take is that the CRC is the best size-bound
> summary of serialized record content available to us which is expensive to
> recompute if the user were to redo it. I'd argue this summary of a record
> qualifies as its metadata. After all, we use the record CRC for a very
> important test of the system durability as it travels through the system.
>
> 1. Isn't the TopicPartition + Offset already uniquely identified a message?
> > It seems better than CRC no matter from summary point of view or auditing
> > point of view.
>
>
> The offset is a system-assigned value of uniqueness to the message. If you
> trusted the system that much, you are not looking to monitor it out-of-band
> :-)
>
>
> > 2. Currently CRC only has 4 bytes. So it will have collision when there
> are
> > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
> > trillion messages per day. So there will be at least a couple of hundreds
> > collision for each CRC code every day, whereas TopicPartition+Offset will
> > not have any collision.
>
>
> The CRC isn't sent over the wire and doesn't add any extra overhead in
> processing, so what is your concern? If you aren't convinced about its
> usefulness, you can always use the default do-nothing interceptor at
> LinkedIn and ignore the CRC.
>
> Without having
> > the entire message bytes, they may not be able to verify its correctness,
> > and the CRC could even be invalid if the broker ever overwritten any
> field
> > or did format conversion.
> >
>
> This doesn't make sense to me. The CRC is used for the most important
> durability check by Kafka - to verify that the message was not garbled over
> the wire. The system can't change it; it has to match on the consumer side
> or we will not return it to the user.
>
> On Fri, Jan 29, 2016 at 3:23 AM, Becket Qin  wrote:
>
> > Anna,
> >
> > It is still not clear to me why we should expose CRC to end user.
> > Followings are my confusions.
> >
> > 1. Isn't the TopicPartition + Offset already uniquely identified a
> message?
> > It seems better than CRC no matter from summary point of view or auditing
> > point of view.
> >
> > 2. Currently CRC only has 4 bytes. So it will have collision when there
> 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Jay Kreps
The rationale for the CRC covering the whole record was to check corruption
in the full record contents as corruption there will equally prevent
someone trying to consume the data. I think you could argue either way but
let's definitely not end up with two different CRC calculations, that would
just be indecisive.

That is a super interesting finding about the CRC class. I think you are
right that it became an intrinsic in java 8 with hw support if available.
We should really switch to that. The CRC calculation is still one of the
biggest bottlenecks in the producer and consumer so that should
significantly speed things up.

-Jay

On Fri, Jan 29, 2016 at 4:30 PM, Becket Qin  wrote:

> Hi Anna,
>
> I think there is value if CRC for only user bytes can be used. This will
> help when we have future protocol updates. Otherwise any protocol migration
> might break auditing if it largely relies on CRC including system bytes.
>
> I did some test to understand the performance overhead of having a separate
> user bytes CRC,
>
> On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for
> each megabyte it takes 0.47 ms.
> To compare with compression cost, I used the compressor used by producer.
> The test uses GZIP and compress 1MB of data for each batch.
> It takes:
> ~90 seconds to compress 10GB bytes that contains all same byte.
> ~470 seconds to compress 10GB bytes containing random bytes.
> >1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns. (I
> guess it takes time to update the pattern mapping)
>
> So the overhead of a separate CRC computing is <1% considering the producer
> do a lot of things other than compression.
>
> I am not sure if this overhead is worth taking or not, because we do have a
> huge number of messages to send, so any overhead should be avoided if
> possible.
>
> BTW. Another interesting finding is that the Crc32 class used in clients
> which used to be faster than java CRC32 in Java 1.6. It seems no longer the
> case. What I see is that Java CRC32 class is 2x faster than the Crc32 class
> we are using now.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Jan 29, 2016 at 1:46 PM, Anna Povzner  wrote:
>
> > Joel, thanks for your feedback. I updated the wiki based on your comments
> > about the wiki writeup.
> >
> >
> > On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner 
> wrote:
> >
> > > Becket,
> > >
> > > In your scenario with one message from producer A and one message from
> > > producer B, those are two different messages, and they should be
> tracked
> > as
> > > two different messages. So I would argue for using record CRC -- CRC
> that
> > > is actually used by the system + it will not require computing a
> > different
> > > CRC again which will add performance overhead.
> > >
> > > If the broker ever changes the CRC, the scenarios when that happens
> > should
> > > be very well defined. As far as I know, the scenarios when CRC is
> > > overwritten by the broker (including KIP-31/32 changes):
> > > -- topic config is LogAppendTime for timestamp type
> > > -- upgrade/downgrade
> > > -- compression codec change (which could be inferred from config).
> > >
> > > Monitoring/audit just needs to know when CRCs are safe to use, which is
> > > most often is known from config. In the future, this can be further
> > > addressed by broker interceptors.
> > >
> > > Thanks,
> > > Anna
> > >
> > >
> > >
> > >
> > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin 
> > wrote:
> > >
> > >> Neha,
> > >>
> > >> CRC is definitely an important type of metadata of a record. I am not
> > >> arguing about that. But I think we should distinguish between two
> types
> > of
> > >> checksum here, 1) the checksum of user data. and 2) the checksum
> > including
> > >> system appended bytes.
> > >>
> > >> I completely agree that (1) is good to add. But I am not sure if we
> > should
> > >> expose (2) to user, because this means any underlying protocol change
> > will
> > >> give a different CRC for exact same message. For example, let's say
> > >> producer A is sending message with timestamp. Producer B is sending
> > >> message
> > >> without timestamp. Even they are given the exact same message, the CRC
> > >> returned would be different.
> > >>
> > >> Also, Kafka broker will modify the system appended bytes in different
> > >> scenarios, such as compression codec change, message format
> > >> conversion(After KIP-31 and KIP-32).
> > >>
> > >> So my concern is that we are exposing CRC which including system
> > appended
> > >> bytes to user.
> > >>
> > >> Other than this I think everything looks good. Nice work, Anna.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy 
> > wrote:
> > >>
> > >> > Responding to some of the earlier comments in the thread:
> > >> >
> > >> > @Jay/@Neha,
> > >> >
> > >> > I think any one 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Becket Qin
Hi Anna,

I think there is value if CRC for only user bytes can be used. This will
help when we have future protocol updates. Otherwise any protocol migration
might break auditing if it largely relies on CRC including system bytes.

I did some test to understand the performance overhead of having a separate
user bytes CRC,

On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for
each megabyte it takes 0.47 ms.
To compare with compression cost, I used the compressor used by producer.
The test uses GZIP and compress 1MB of data for each batch.
It takes:
~90 seconds to compress 10GB bytes that contains all same byte.
~470 seconds to compress 10GB bytes containing random bytes.
>1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns. (I
guess it takes time to update the pattern mapping)

So the overhead of a separate CRC computing is <1% considering the producer
do a lot of things other than compression.

I am not sure if this overhead is worth taking or not, because we do have a
huge number of messages to send, so any overhead should be avoided if
possible.

BTW. Another interesting finding is that the Crc32 class used in clients
which used to be faster than java CRC32 in Java 1.6. It seems no longer the
case. What I see is that Java CRC32 class is 2x faster than the Crc32 class
we are using now.

Thanks,

Jiangjie (Becket) Qin


On Fri, Jan 29, 2016 at 1:46 PM, Anna Povzner  wrote:

> Joel, thanks for your feedback. I updated the wiki based on your comments
> about the wiki writeup.
>
>
> On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner  wrote:
>
> > Becket,
> >
> > In your scenario with one message from producer A and one message from
> > producer B, those are two different messages, and they should be tracked
> as
> > two different messages. So I would argue for using record CRC -- CRC that
> > is actually used by the system + it will not require computing a
> different
> > CRC again which will add performance overhead.
> >
> > If the broker ever changes the CRC, the scenarios when that happens
> should
> > be very well defined. As far as I know, the scenarios when CRC is
> > overwritten by the broker (including KIP-31/32 changes):
> > -- topic config is LogAppendTime for timestamp type
> > -- upgrade/downgrade
> > -- compression codec change (which could be inferred from config).
> >
> > Monitoring/audit just needs to know when CRCs are safe to use, which is
> > most often is known from config. In the future, this can be further
> > addressed by broker interceptors.
> >
> > Thanks,
> > Anna
> >
> >
> >
> >
> > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin 
> wrote:
> >
> >> Neha,
> >>
> >> CRC is definitely an important type of metadata of a record. I am not
> >> arguing about that. But I think we should distinguish between two types
> of
> >> checksum here, 1) the checksum of user data. and 2) the checksum
> including
> >> system appended bytes.
> >>
> >> I completely agree that (1) is good to add. But I am not sure if we
> should
> >> expose (2) to user, because this means any underlying protocol change
> will
> >> give a different CRC for exact same message. For example, let's say
> >> producer A is sending message with timestamp. Producer B is sending
> >> message
> >> without timestamp. Even they are given the exact same message, the CRC
> >> returned would be different.
> >>
> >> Also, Kafka broker will modify the system appended bytes in different
> >> scenarios, such as compression codec change, message format
> >> conversion(After KIP-31 and KIP-32).
> >>
> >> So my concern is that we are exposing CRC which including system
> appended
> >> bytes to user.
> >>
> >> Other than this I think everything looks good. Nice work, Anna.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy 
> wrote:
> >>
> >> > Responding to some of the earlier comments in the thread:
> >> >
> >> > @Jay/@Neha,
> >> >
> >> > I think any one of onCommit/onAppend/onArrival would work for the
> >> concrete
> >> > use-case that I had outlined. I think onArrival is additionally useful
> >> for
> >> > custom validation - i.e., reject the message and do not append if it
> >> > violates some cluster-specific rule (for e.g., if some header
> timestamp
> >> is
> >> > older than xyz). However, the thing with user-supplied validation is
> we
> >> > would have to do with a (new) generic error code in the producer
> >> response.
> >> > While there is a risk of a broker interceptor having high latency I
> >> think
> >> > that is acceptable since it is the user's responsibility to ensure low
> >> > latency - the producer call-back and onAcknowledgment interceptor are
> >> > similar in this regard although those are less risky. Even so, I think
> >> > there are clear use-cases for broker interceptors so I feel the risk
> >> part
> >> > is something that just needs to be documented. 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Anna Povzner
Joel, thanks for your feedback. I updated the wiki based on your comments
about the wiki writeup.


On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner  wrote:

> Becket,
>
> In your scenario with one message from producer A and one message from
> producer B, those are two different messages, and they should be tracked as
> two different messages. So I would argue for using record CRC -- CRC that
> is actually used by the system + it will not require computing a different
> CRC again which will add performance overhead.
>
> If the broker ever changes the CRC, the scenarios when that happens should
> be very well defined. As far as I know, the scenarios when CRC is
> overwritten by the broker (including KIP-31/32 changes):
> -- topic config is LogAppendTime for timestamp type
> -- upgrade/downgrade
> -- compression codec change (which could be inferred from config).
>
> Monitoring/audit just needs to know when CRCs are safe to use, which is
> most often is known from config. In the future, this can be further
> addressed by broker interceptors.
>
> Thanks,
> Anna
>
>
>
>
> On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin  wrote:
>
>> Neha,
>>
>> CRC is definitely an important type of metadata of a record. I am not
>> arguing about that. But I think we should distinguish between two types of
>> checksum here, 1) the checksum of user data. and 2) the checksum including
>> system appended bytes.
>>
>> I completely agree that (1) is good to add. But I am not sure if we should
>> expose (2) to user, because this means any underlying protocol change will
>> give a different CRC for exact same message. For example, let's say
>> producer A is sending message with timestamp. Producer B is sending
>> message
>> without timestamp. Even they are given the exact same message, the CRC
>> returned would be different.
>>
>> Also, Kafka broker will modify the system appended bytes in different
>> scenarios, such as compression codec change, message format
>> conversion(After KIP-31 and KIP-32).
>>
>> So my concern is that we are exposing CRC which including system appended
>> bytes to user.
>>
>> Other than this I think everything looks good. Nice work, Anna.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy  wrote:
>>
>> > Responding to some of the earlier comments in the thread:
>> >
>> > @Jay/@Neha,
>> >
>> > I think any one of onCommit/onAppend/onArrival would work for the
>> concrete
>> > use-case that I had outlined. I think onArrival is additionally useful
>> for
>> > custom validation - i.e., reject the message and do not append if it
>> > violates some cluster-specific rule (for e.g., if some header timestamp
>> is
>> > older than xyz). However, the thing with user-supplied validation is we
>> > would have to do with a (new) generic error code in the producer
>> response.
>> > While there is a risk of a broker interceptor having high latency I
>> think
>> > that is acceptable since it is the user's responsibility to ensure low
>> > latency - the producer call-back and onAcknowledgment interceptor are
>> > similar in this regard although those are less risky. Even so, I think
>> > there are clear use-cases for broker interceptors so I feel the risk
>> part
>> > is something that just needs to be documented. @Jay that is a good point
>> > about moving from Message/MessageSet to Records although that may be
>> less
>> > difficult to absorb since it is a broker-side interceptor and so people
>> > don't need to get a ton of applications in their company to switch to
>> use
>> > it.
>> >
>> > Re: onEnqueued: monitoring serialization latency can be done via metrics
>> > but this is more useful for recording whether serialization succeeded or
>> > not. onAcknowledgment subsumes this but it also subsumes other possible
>> > errors (such as produce errors). It is more fine-grained than most
>> people
>> > need though (i.e., I don't think we will use it even if it is present.)
>> >
>> > Re: checksums: I think it is a good addition to metadata; and for
>> > low-volume or super-critical topics can be used for very strict
>> auditing.
>> >
>> > There are a couple of typos/edits for the wiki itself:
>> >
>> >- Under Kafka Producer changes:
>> >- you have references to KafkaConsumer constructor and
>> >   ConsumerConfig.originals.
>> >   - sendRecord -> sentRecord (may be clearer)
>> >- Under ProducerInterceptor interface: there is a mention of
>> onEnqueued
>> >which was rejected
>> >- Comment for ConsumerRecord.record should probably be: // NEW:
>> record
>> >size in bytes (*after decompression*)
>> >
>> >
>> > BTW - Anna, nice work on the KIP!
>> >
>> > Joel
>> >
>> > On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede 
>> wrote:
>> >
>> > > Becket,
>> > >
>> > > Is your concern the presence of CRC in the RecordMetadata or do you
>> want
>> > to
>> > > brainstorm how CRC can be used for 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Becket Qin
Neha,

CRC is definitely an important type of metadata of a record. I am not
arguing about that. But I think we should distinguish between two types of
checksum here, 1) the checksum of user data. and 2) the checksum including
system appended bytes.

I completely agree that (1) is good to add. But I am not sure if we should
expose (2) to user, because this means any underlying protocol change will
give a different CRC for exact same message. For example, let's say
producer A is sending message with timestamp. Producer B is sending message
without timestamp. Even they are given the exact same message, the CRC
returned would be different.

Also, Kafka broker will modify the system appended bytes in different
scenarios, such as compression codec change, message format
conversion(After KIP-31 and KIP-32).

So my concern is that we are exposing CRC which including system appended
bytes to user.

Other than this I think everything looks good. Nice work, Anna.

Thanks,

Jiangjie (Becket) Qin

On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy  wrote:

> Responding to some of the earlier comments in the thread:
>
> @Jay/@Neha,
>
> I think any one of onCommit/onAppend/onArrival would work for the concrete
> use-case that I had outlined. I think onArrival is additionally useful for
> custom validation - i.e., reject the message and do not append if it
> violates some cluster-specific rule (for e.g., if some header timestamp is
> older than xyz). However, the thing with user-supplied validation is we
> would have to do with a (new) generic error code in the producer response.
> While there is a risk of a broker interceptor having high latency I think
> that is acceptable since it is the user's responsibility to ensure low
> latency - the producer call-back and onAcknowledgment interceptor are
> similar in this regard although those are less risky. Even so, I think
> there are clear use-cases for broker interceptors so I feel the risk part
> is something that just needs to be documented. @Jay that is a good point
> about moving from Message/MessageSet to Records although that may be less
> difficult to absorb since it is a broker-side interceptor and so people
> don't need to get a ton of applications in their company to switch to use
> it.
>
> Re: onEnqueued: monitoring serialization latency can be done via metrics
> but this is more useful for recording whether serialization succeeded or
> not. onAcknowledgment subsumes this but it also subsumes other possible
> errors (such as produce errors). It is more fine-grained than most people
> need though (i.e., I don't think we will use it even if it is present.)
>
> Re: checksums: I think it is a good addition to metadata; and for
> low-volume or super-critical topics can be used for very strict auditing.
>
> There are a couple of typos/edits for the wiki itself:
>
>- Under Kafka Producer changes:
>- you have references to KafkaConsumer constructor and
>   ConsumerConfig.originals.
>   - sendRecord -> sentRecord (may be clearer)
>- Under ProducerInterceptor interface: there is a mention of onEnqueued
>which was rejected
>- Comment for ConsumerRecord.record should probably be: // NEW: record
>size in bytes (*after decompression*)
>
>
> BTW - Anna, nice work on the KIP!
>
> Joel
>
> On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede  wrote:
>
> > Becket,
> >
> > Is your concern the presence of CRC in the RecordMetadata or do you want
> to
> > brainstorm how CRC can be used for auditing? I think we shouldn't try to
> > think about the various ways that people can do monitoring using
> > interceptors and the metadata we provide. The entire point of having
> > pluggable interceptors is so that people can employ their own creative
> > mechanisms to make use of interceptors.
> >
> > I do think that it is worth discussing whether or not CRC makes sense as
> > record metadata to the user. My take is that the CRC is the best
> size-bound
> > summary of serialized record content available to us which is expensive
> to
> > recompute if the user were to redo it. I'd argue this summary of a record
> > qualifies as its metadata. After all, we use the record CRC for a very
> > important test of the system durability as it travels through the system.
> >
> > 1. Isn't the TopicPartition + Offset already uniquely identified a
> message?
> > > It seems better than CRC no matter from summary point of view or
> auditing
> > > point of view.
> >
> >
> > The offset is a system-assigned value of uniqueness to the message. If
> you
> > trusted the system that much, you are not looking to monitor it
> out-of-band
> > :-)
> >
> >
> > > 2. Currently CRC only has 4 bytes. So it will have collision when there
> > are
> > > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3
> > > trillion messages per day. So there will be at least a couple of
> hundreds
> > > collision for each CRC code every day, whereas 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-29 Thread Anna Povzner
Becket,

In your scenario with one message from producer A and one message from
producer B, those are two different messages, and they should be tracked as
two different messages. So I would argue for using record CRC -- CRC that
is actually used by the system + it will not require computing a different
CRC again which will add performance overhead.

If the broker ever changes the CRC, the scenarios when that happens should
be very well defined. As far as I know, the scenarios when CRC is
overwritten by the broker (including KIP-31/32 changes):
-- topic config is LogAppendTime for timestamp type
-- upgrade/downgrade
-- compression codec change (which could be inferred from config).

Monitoring/audit just needs to know when CRCs are safe to use, which is
most often is known from config. In the future, this can be further
addressed by broker interceptors.

Thanks,
Anna




On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin  wrote:

> Neha,
>
> CRC is definitely an important type of metadata of a record. I am not
> arguing about that. But I think we should distinguish between two types of
> checksum here, 1) the checksum of user data. and 2) the checksum including
> system appended bytes.
>
> I completely agree that (1) is good to add. But I am not sure if we should
> expose (2) to user, because this means any underlying protocol change will
> give a different CRC for exact same message. For example, let's say
> producer A is sending message with timestamp. Producer B is sending message
> without timestamp. Even they are given the exact same message, the CRC
> returned would be different.
>
> Also, Kafka broker will modify the system appended bytes in different
> scenarios, such as compression codec change, message format
> conversion(After KIP-31 and KIP-32).
>
> So my concern is that we are exposing CRC which including system appended
> bytes to user.
>
> Other than this I think everything looks good. Nice work, Anna.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy  wrote:
>
> > Responding to some of the earlier comments in the thread:
> >
> > @Jay/@Neha,
> >
> > I think any one of onCommit/onAppend/onArrival would work for the
> concrete
> > use-case that I had outlined. I think onArrival is additionally useful
> for
> > custom validation - i.e., reject the message and do not append if it
> > violates some cluster-specific rule (for e.g., if some header timestamp
> is
> > older than xyz). However, the thing with user-supplied validation is we
> > would have to do with a (new) generic error code in the producer
> response.
> > While there is a risk of a broker interceptor having high latency I think
> > that is acceptable since it is the user's responsibility to ensure low
> > latency - the producer call-back and onAcknowledgment interceptor are
> > similar in this regard although those are less risky. Even so, I think
> > there are clear use-cases for broker interceptors so I feel the risk part
> > is something that just needs to be documented. @Jay that is a good point
> > about moving from Message/MessageSet to Records although that may be less
> > difficult to absorb since it is a broker-side interceptor and so people
> > don't need to get a ton of applications in their company to switch to use
> > it.
> >
> > Re: onEnqueued: monitoring serialization latency can be done via metrics
> > but this is more useful for recording whether serialization succeeded or
> > not. onAcknowledgment subsumes this but it also subsumes other possible
> > errors (such as produce errors). It is more fine-grained than most people
> > need though (i.e., I don't think we will use it even if it is present.)
> >
> > Re: checksums: I think it is a good addition to metadata; and for
> > low-volume or super-critical topics can be used for very strict auditing.
> >
> > There are a couple of typos/edits for the wiki itself:
> >
> >- Under Kafka Producer changes:
> >- you have references to KafkaConsumer constructor and
> >   ConsumerConfig.originals.
> >   - sendRecord -> sentRecord (may be clearer)
> >- Under ProducerInterceptor interface: there is a mention of
> onEnqueued
> >which was rejected
> >- Comment for ConsumerRecord.record should probably be: // NEW: record
> >size in bytes (*after decompression*)
> >
> >
> > BTW - Anna, nice work on the KIP!
> >
> > Joel
> >
> > On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede 
> wrote:
> >
> > > Becket,
> > >
> > > Is your concern the presence of CRC in the RecordMetadata or do you
> want
> > to
> > > brainstorm how CRC can be used for auditing? I think we shouldn't try
> to
> > > think about the various ways that people can do monitoring using
> > > interceptors and the metadata we provide. The entire point of having
> > > pluggable interceptors is so that people can employ their own creative
> > > mechanisms to make use of interceptors.
> > >
> > > I do think 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Becket Qin
I am +1 on #1.2 and #3.

#2: Regarding CRC, I am not sure if users care about CRC. is there any
specific use case? Currently we validate messages by calling ensureValid()
to verify the checksum and throw exception if it does not match.

Message size would be useful. We can add that to ConsumerRecord. Can you
clarify the message size you are referring to? Does it include the message
header overhead or not? From user's point of view, they probably don't care
about header size.

Thanks,

Jiangjie (Becket) Qin


On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede  wrote:

> Anna,
>
> Thanks for being diligent.
>
> +1 on #1.2 and sounds good on #3. I recommend adding checksum and size
> fields to RecordMetadata and ConsumerRecord instead of exposing metadata
> piecemeal in the interceptor APIs.
>
> Thanks,
> Neha
>
> On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner  wrote:
>
> > Hi All,
> >
> > The KIP wiki page is now up-to-date with the scope we have agreed on:
> > Producer and Consumer Interceptors with a minimal set of mutable API that
> > are not dependent on producer and consumer internal implementation.
> >
> > I have few more API details that I would like to bring attention to
> or/and
> > discuss:
> >
> > 1. Handling exceptions
> >
> > Exceptions can provide an additional level of control. For example, we
> can
> > filter messages on consumer side or stop messages on producer if they
> don’t
> > have the right field.
> >
> > I see two options:
> > 1.1. For callbacks that can mutate records (onSend and onConsume),
> > propagate exceptions through the original calls (KafkaProducer.send() and
> > KafkaConsumer.poll() respectively). For other callbacks, catch exception,
> > log, and ignore.
> > 1.2. Catch exceptions from all the interceptor callbacks and ignore.
> >
> > The issue with 1.1. is that it effectively changes KafkaProducer.send()
> and
> > KafkaConsumer.poll() API, since now they may throw exceptions that are
> not
> > documented in KafkaProducer/Consumer API. Another option is to allow to
> > propagate some exceptions, and ignore others.
> >
> > I think our use-cases do not require propagating exceptions. So, I
> propose
> > option 1.2. Unless someone has suggestion/use-cases for propagating
> > exceptions. Please let me know.
> >
> > 2. Intercepting record CRC and record size
> >
> > Since we decided not to add any intermediate callbacks (such as onEnqueue
> > or onReceive) to interceptors, I think it is still valuable to intercept
> > record CRC and record size in bytes for monitoring and audit use-cases.
> >
> > I propose to add checksum and size fields to RecordMetadata and
> > ConsumerRecord. Another option would be to add them as parameters in
> > onAcknowledgement() and onConsume() callbacks.
> >
> > 3. Callbacks that allow to modify records look as follows:
> > ProducerRecord onSend(ProducerRecord record);
> > ConsumerRecords onConsume(ConsumerRecords records);
> >
> > This means that interceptors can potentially modify topic/partition in
> > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose
> that
> > it is up to the interceptor implementation to ensure that
> topic/partition,
> > etc is correct. KafkaProducer.send() will use topic, partition, key, and
> > value from ProducerRecord returned from the onSend(). Similarly,
> > ConsumerRecords returned from KafkaConsumer.poll() would be the ones
> > returned from the interceptor.
> >
> >
> >
> > Please let me know if you have any suggestions or objections to the
> above.
> >
> > Thanks,
> > Anna
> >
> >
> > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner  wrote:
> >
> > > Hi Mayuresh,
> > >
> > > I see why you would want to check for messages left in the
> > > RecordAccumulator. However, I don't think this will completely solve
> the
> > > problem. Messages could be in-flight somewhere else, like in the
> socket,
> > or
> > > there maybe in-flight messages on the consumer side of the MirrorMaker.
> > So,
> > > if we go the route of checking whether there are any in-flight messages
> > for
> > > topic deletion use-case, maybe it is better count them with onSend()
> and
> > > onAcknowledge() -- whether all messages sent were acknowledged. I also
> > > think that it would be better to solve this without interceptors, such
> as
> > > fix error handling in this scenario. However, I do not have any good
> > > proposal right now, so these are just general thoughts.
> > >
> > > Thanks,
> > > Anna
> > >
> > >
> > >
> > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com> wrote:
> > >
> > >> Calling producer.flush(), flushes all the data. So this is OK. But
> when
> > >> you
> > >> are running Mirror maker, I am not sure there is a way to flush() from
> > >> outside.
> > >>
> > >>
> > >> Thanks,
> > >>
> > >> Mayuresh
> > >>
> > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin 
> > >> wrote:
> > >>
> > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Anna Povzner
Hi Becket,

The use-case for CRC is end-to-end audit, rather than checking whether a
single message is corrupt or not.

Regarding record size, I was thinking to extract record size from Record.
That will include header overhead as well. I think total record size will
tell users how much bandwidth their messages take. Since header is
relatively small and constant, users also will get an idea of their
key/value sizes.

Thanks,
Anna

On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin  wrote:

> I am +1 on #1.2 and #3.
>
> #2: Regarding CRC, I am not sure if users care about CRC. is there any
> specific use case? Currently we validate messages by calling ensureValid()
> to verify the checksum and throw exception if it does not match.
>
> Message size would be useful. We can add that to ConsumerRecord. Can you
> clarify the message size you are referring to? Does it include the message
> header overhead or not? From user's point of view, they probably don't care
> about header size.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede  wrote:
>
> > Anna,
> >
> > Thanks for being diligent.
> >
> > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size
> > fields to RecordMetadata and ConsumerRecord instead of exposing metadata
> > piecemeal in the interceptor APIs.
> >
> > Thanks,
> > Neha
> >
> > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner  wrote:
> >
> > > Hi All,
> > >
> > > The KIP wiki page is now up-to-date with the scope we have agreed on:
> > > Producer and Consumer Interceptors with a minimal set of mutable API
> that
> > > are not dependent on producer and consumer internal implementation.
> > >
> > > I have few more API details that I would like to bring attention to
> > or/and
> > > discuss:
> > >
> > > 1. Handling exceptions
> > >
> > > Exceptions can provide an additional level of control. For example, we
> > can
> > > filter messages on consumer side or stop messages on producer if they
> > don’t
> > > have the right field.
> > >
> > > I see two options:
> > > 1.1. For callbacks that can mutate records (onSend and onConsume),
> > > propagate exceptions through the original calls (KafkaProducer.send()
> and
> > > KafkaConsumer.poll() respectively). For other callbacks, catch
> exception,
> > > log, and ignore.
> > > 1.2. Catch exceptions from all the interceptor callbacks and ignore.
> > >
> > > The issue with 1.1. is that it effectively changes KafkaProducer.send()
> > and
> > > KafkaConsumer.poll() API, since now they may throw exceptions that are
> > not
> > > documented in KafkaProducer/Consumer API. Another option is to allow to
> > > propagate some exceptions, and ignore others.
> > >
> > > I think our use-cases do not require propagating exceptions. So, I
> > propose
> > > option 1.2. Unless someone has suggestion/use-cases for propagating
> > > exceptions. Please let me know.
> > >
> > > 2. Intercepting record CRC and record size
> > >
> > > Since we decided not to add any intermediate callbacks (such as
> onEnqueue
> > > or onReceive) to interceptors, I think it is still valuable to
> intercept
> > > record CRC and record size in bytes for monitoring and audit use-cases.
> > >
> > > I propose to add checksum and size fields to RecordMetadata and
> > > ConsumerRecord. Another option would be to add them as parameters in
> > > onAcknowledgement() and onConsume() callbacks.
> > >
> > > 3. Callbacks that allow to modify records look as follows:
> > > ProducerRecord onSend(ProducerRecord record);
> > > ConsumerRecords onConsume(ConsumerRecords records);
> > >
> > > This means that interceptors can potentially modify topic/partition in
> > > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose
> > that
> > > it is up to the interceptor implementation to ensure that
> > topic/partition,
> > > etc is correct. KafkaProducer.send() will use topic, partition, key,
> and
> > > value from ProducerRecord returned from the onSend(). Similarly,
> > > ConsumerRecords returned from KafkaConsumer.poll() would be the ones
> > > returned from the interceptor.
> > >
> > >
> > >
> > > Please let me know if you have any suggestions or objections to the
> > above.
> > >
> > > Thanks,
> > > Anna
> > >
> > >
> > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner 
> wrote:
> > >
> > > > Hi Mayuresh,
> > > >
> > > > I see why you would want to check for messages left in the
> > > > RecordAccumulator. However, I don't think this will completely solve
> > the
> > > > problem. Messages could be in-flight somewhere else, like in the
> > socket,
> > > or
> > > > there maybe in-flight messages on the consumer side of the
> MirrorMaker.
> > > So,
> > > > if we go the route of checking whether there are any in-flight
> messages
> > > for
> > > > topic deletion use-case, maybe it is better count them with onSend()
> > and
> > > > onAcknowledge() -- whether all 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Becket Qin
Anna,

Using CRC to do end2end auditing might be very costly because you will need
to collect all the CRC from both producer and consumer. And it is based on
the assumption that broker does not modify the record.
Can you shed some idea on how end to end auditing will be using the CRC
before we decide to expose such low level detail to the end user? It would
also be helpful if you can compare it with something like sequence number
based auditing.

About the record size, one thing worth notice is that the size of Record is
not the actual bytes sent over the wire if we use compression. So that does
not really tell user how much bandwidth they are using. Personally I think
two kinds of size may be useful.
1. The record size after serialization, i.e. application bytes. (The
uncompressed record size can be easily derived as well)
2. The actual bytes sent over the wire.
We can get (1) easily, but (2) is difficult to get at Record level when we
use compression.

Thanks,

Jiangjie (Becket) Qin

On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner  wrote:

> Hi Becket,
>
> The use-case for CRC is end-to-end audit, rather than checking whether a
> single message is corrupt or not.
>
> Regarding record size, I was thinking to extract record size from Record.
> That will include header overhead as well. I think total record size will
> tell users how much bandwidth their messages take. Since header is
> relatively small and constant, users also will get an idea of their
> key/value sizes.
>
> Thanks,
> Anna
>
> On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin  wrote:
>
> > I am +1 on #1.2 and #3.
> >
> > #2: Regarding CRC, I am not sure if users care about CRC. is there any
> > specific use case? Currently we validate messages by calling
> ensureValid()
> > to verify the checksum and throw exception if it does not match.
> >
> > Message size would be useful. We can add that to ConsumerRecord. Can you
> > clarify the message size you are referring to? Does it include the
> message
> > header overhead or not? From user's point of view, they probably don't
> care
> > about header size.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede 
> wrote:
> >
> > > Anna,
> > >
> > > Thanks for being diligent.
> > >
> > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size
> > > fields to RecordMetadata and ConsumerRecord instead of exposing
> metadata
> > > piecemeal in the interceptor APIs.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner 
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > The KIP wiki page is now up-to-date with the scope we have agreed on:
> > > > Producer and Consumer Interceptors with a minimal set of mutable API
> > that
> > > > are not dependent on producer and consumer internal implementation.
> > > >
> > > > I have few more API details that I would like to bring attention to
> > > or/and
> > > > discuss:
> > > >
> > > > 1. Handling exceptions
> > > >
> > > > Exceptions can provide an additional level of control. For example,
> we
> > > can
> > > > filter messages on consumer side or stop messages on producer if they
> > > don’t
> > > > have the right field.
> > > >
> > > > I see two options:
> > > > 1.1. For callbacks that can mutate records (onSend and onConsume),
> > > > propagate exceptions through the original calls (KafkaProducer.send()
> > and
> > > > KafkaConsumer.poll() respectively). For other callbacks, catch
> > exception,
> > > > log, and ignore.
> > > > 1.2. Catch exceptions from all the interceptor callbacks and ignore.
> > > >
> > > > The issue with 1.1. is that it effectively changes
> KafkaProducer.send()
> > > and
> > > > KafkaConsumer.poll() API, since now they may throw exceptions that
> are
> > > not
> > > > documented in KafkaProducer/Consumer API. Another option is to allow
> to
> > > > propagate some exceptions, and ignore others.
> > > >
> > > > I think our use-cases do not require propagating exceptions. So, I
> > > propose
> > > > option 1.2. Unless someone has suggestion/use-cases for propagating
> > > > exceptions. Please let me know.
> > > >
> > > > 2. Intercepting record CRC and record size
> > > >
> > > > Since we decided not to add any intermediate callbacks (such as
> > onEnqueue
> > > > or onReceive) to interceptors, I think it is still valuable to
> > intercept
> > > > record CRC and record size in bytes for monitoring and audit
> use-cases.
> > > >
> > > > I propose to add checksum and size fields to RecordMetadata and
> > > > ConsumerRecord. Another option would be to add them as parameters in
> > > > onAcknowledgement() and onConsume() callbacks.
> > > >
> > > > 3. Callbacks that allow to modify records look as follows:
> > > > ProducerRecord onSend(ProducerRecord record);
> > > > ConsumerRecords onConsume(ConsumerRecords records);
> > > >
> > > > This means that 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Anna Povzner
Regarding record size as bytes sent over the wire, my concern is that it is
almost impossible to calculate per-record. We could do as: 1) compressed
bytes / number of records in a compressed message, as Todd mentioned; or 2)
or same as #1 but take it proportional to uncompressed record size vs.
total uncompressed size of records. All of these calculations give us an
estimate. So maybe record size as bytes sent over the wire is not a
per-record metadata, but rather per topic/partition measure that is better
to be exposed through metrics?


On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino  wrote:

> It may be difficult (or nearly impossible) to get actual compressed bytes
> for a message from a compressed batch, but I do think it’s useful
> information to have available for the very reason noted, bandwidth
> consumed. Does it make sense to have an interceptor at the batch level that
> can provide this? The other option is to estimate it (such as making an
> assumption that the messages in a batch are equal in size, which is not
> necessarily true), which is probably not the right answer.
>
> -Todd
>
>
> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner  wrote:
>
> > Hi Becket,
> >
> > It will be up to the interceptor to implement their audit or monitoring
> > strategy. I would also imagine there is more than one good way to do
> audit.
> > So, I agree that some of the interceptors may not use CRC, and we will
> not
> > require it. The question is now whether intercepting CRCs is needed. I
> > think they are very useful for monitoring and audit, because CRC provides
> > an a easy way to get a summary of a message, rather than using message
> > bytes or key/value objects.
> >
> > Regarding record size, I agree that bandwidth example was not a good
> one. I
> > think it would be hard to get actual bytes sent over the wire (your #2),
> > since multiple records get compressed together and we would need to
> decide
> > which bytes to account to which record. So I am inclined to only do your
> > #1. However, it still makes more sense to me just to return record size
> > including the header, since this is the actual record size.
> >
> > Thanks,
> > Anna
> >
> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin 
> wrote:
> >
> > > Anna,
> > >
> > > Using CRC to do end2end auditing might be very costly because you will
> > need
> > > to collect all the CRC from both producer and consumer. And it is based
> > on
> > > the assumption that broker does not modify the record.
> > > Can you shed some idea on how end to end auditing will be using the CRC
> > > before we decide to expose such low level detail to the end user? It
> > would
> > > also be helpful if you can compare it with something like sequence
> number
> > > based auditing.
> > >
> > > About the record size, one thing worth notice is that the size of
> Record
> > is
> > > not the actual bytes sent over the wire if we use compression. So that
> > does
> > > not really tell user how much bandwidth they are using. Personally I
> > think
> > > two kinds of size may be useful.
> > > 1. The record size after serialization, i.e. application bytes. (The
> > > uncompressed record size can be easily derived as well)
> > > 2. The actual bytes sent over the wire.
> > > We can get (1) easily, but (2) is difficult to get at Record level when
> > we
> > > use compression.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner 
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > The use-case for CRC is end-to-end audit, rather than checking
> whether
> > a
> > > > single message is corrupt or not.
> > > >
> > > > Regarding record size, I was thinking to extract record size from
> > Record.
> > > > That will include header overhead as well. I think total record size
> > will
> > > > tell users how much bandwidth their messages take. Since header is
> > > > relatively small and constant, users also will get an idea of their
> > > > key/value sizes.
> > > >
> > > > Thanks,
> > > > Anna
> > > >
> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin 
> > > wrote:
> > > >
> > > > > I am +1 on #1.2 and #3.
> > > > >
> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there
> > any
> > > > > specific use case? Currently we validate messages by calling
> > > > ensureValid()
> > > > > to verify the checksum and throw exception if it does not match.
> > > > >
> > > > > Message size would be useful. We can add that to ConsumerRecord.
> Can
> > > you
> > > > > clarify the message size you are referring to? Does it include the
> > > > message
> > > > > header overhead or not? From user's point of view, they probably
> > don't
> > > > care
> > > > > about header size.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Todd Palino
It may be difficult (or nearly impossible) to get actual compressed bytes
for a message from a compressed batch, but I do think it’s useful
information to have available for the very reason noted, bandwidth
consumed. Does it make sense to have an interceptor at the batch level that
can provide this? The other option is to estimate it (such as making an
assumption that the messages in a batch are equal in size, which is not
necessarily true), which is probably not the right answer.

-Todd


On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner  wrote:

> Hi Becket,
>
> It will be up to the interceptor to implement their audit or monitoring
> strategy. I would also imagine there is more than one good way to do audit.
> So, I agree that some of the interceptors may not use CRC, and we will not
> require it. The question is now whether intercepting CRCs is needed. I
> think they are very useful for monitoring and audit, because CRC provides
> an a easy way to get a summary of a message, rather than using message
> bytes or key/value objects.
>
> Regarding record size, I agree that bandwidth example was not a good one. I
> think it would be hard to get actual bytes sent over the wire (your #2),
> since multiple records get compressed together and we would need to decide
> which bytes to account to which record. So I am inclined to only do your
> #1. However, it still makes more sense to me just to return record size
> including the header, since this is the actual record size.
>
> Thanks,
> Anna
>
> On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin  wrote:
>
> > Anna,
> >
> > Using CRC to do end2end auditing might be very costly because you will
> need
> > to collect all the CRC from both producer and consumer. And it is based
> on
> > the assumption that broker does not modify the record.
> > Can you shed some idea on how end to end auditing will be using the CRC
> > before we decide to expose such low level detail to the end user? It
> would
> > also be helpful if you can compare it with something like sequence number
> > based auditing.
> >
> > About the record size, one thing worth notice is that the size of Record
> is
> > not the actual bytes sent over the wire if we use compression. So that
> does
> > not really tell user how much bandwidth they are using. Personally I
> think
> > two kinds of size may be useful.
> > 1. The record size after serialization, i.e. application bytes. (The
> > uncompressed record size can be easily derived as well)
> > 2. The actual bytes sent over the wire.
> > We can get (1) easily, but (2) is difficult to get at Record level when
> we
> > use compression.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner 
> wrote:
> >
> > > Hi Becket,
> > >
> > > The use-case for CRC is end-to-end audit, rather than checking whether
> a
> > > single message is corrupt or not.
> > >
> > > Regarding record size, I was thinking to extract record size from
> Record.
> > > That will include header overhead as well. I think total record size
> will
> > > tell users how much bandwidth their messages take. Since header is
> > > relatively small and constant, users also will get an idea of their
> > > key/value sizes.
> > >
> > > Thanks,
> > > Anna
> > >
> > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin 
> > wrote:
> > >
> > > > I am +1 on #1.2 and #3.
> > > >
> > > > #2: Regarding CRC, I am not sure if users care about CRC. is there
> any
> > > > specific use case? Currently we validate messages by calling
> > > ensureValid()
> > > > to verify the checksum and throw exception if it does not match.
> > > >
> > > > Message size would be useful. We can add that to ConsumerRecord. Can
> > you
> > > > clarify the message size you are referring to? Does it include the
> > > message
> > > > header overhead or not? From user's point of view, they probably
> don't
> > > care
> > > > about header size.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede 
> > > wrote:
> > > >
> > > > > Anna,
> > > > >
> > > > > Thanks for being diligent.
> > > > >
> > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and
> > size
> > > > > fields to RecordMetadata and ConsumerRecord instead of exposing
> > > metadata
> > > > > piecemeal in the interceptor APIs.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner 
> > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > The KIP wiki page is now up-to-date with the scope we have agreed
> > on:
> > > > > > Producer and Consumer Interceptors with a minimal set of mutable
> > API
> > > > that
> > > > > > are not dependent on producer and consumer internal
> implementation.
> > > > > >
> > > > > > I have few more API details that I would like to bring attention

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Anna Povzner
Hi Becket,

It will be up to the interceptor to implement their audit or monitoring
strategy. I would also imagine there is more than one good way to do audit.
So, I agree that some of the interceptors may not use CRC, and we will not
require it. The question is now whether intercepting CRCs is needed. I
think they are very useful for monitoring and audit, because CRC provides
an a easy way to get a summary of a message, rather than using message
bytes or key/value objects.

Regarding record size, I agree that bandwidth example was not a good one. I
think it would be hard to get actual bytes sent over the wire (your #2),
since multiple records get compressed together and we would need to decide
which bytes to account to which record. So I am inclined to only do your
#1. However, it still makes more sense to me just to return record size
including the header, since this is the actual record size.

Thanks,
Anna

On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin  wrote:

> Anna,
>
> Using CRC to do end2end auditing might be very costly because you will need
> to collect all the CRC from both producer and consumer. And it is based on
> the assumption that broker does not modify the record.
> Can you shed some idea on how end to end auditing will be using the CRC
> before we decide to expose such low level detail to the end user? It would
> also be helpful if you can compare it with something like sequence number
> based auditing.
>
> About the record size, one thing worth notice is that the size of Record is
> not the actual bytes sent over the wire if we use compression. So that does
> not really tell user how much bandwidth they are using. Personally I think
> two kinds of size may be useful.
> 1. The record size after serialization, i.e. application bytes. (The
> uncompressed record size can be easily derived as well)
> 2. The actual bytes sent over the wire.
> We can get (1) easily, but (2) is difficult to get at Record level when we
> use compression.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner  wrote:
>
> > Hi Becket,
> >
> > The use-case for CRC is end-to-end audit, rather than checking whether a
> > single message is corrupt or not.
> >
> > Regarding record size, I was thinking to extract record size from Record.
> > That will include header overhead as well. I think total record size will
> > tell users how much bandwidth their messages take. Since header is
> > relatively small and constant, users also will get an idea of their
> > key/value sizes.
> >
> > Thanks,
> > Anna
> >
> > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin 
> wrote:
> >
> > > I am +1 on #1.2 and #3.
> > >
> > > #2: Regarding CRC, I am not sure if users care about CRC. is there any
> > > specific use case? Currently we validate messages by calling
> > ensureValid()
> > > to verify the checksum and throw exception if it does not match.
> > >
> > > Message size would be useful. We can add that to ConsumerRecord. Can
> you
> > > clarify the message size you are referring to? Does it include the
> > message
> > > header overhead or not? From user's point of view, they probably don't
> > care
> > > about header size.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede 
> > wrote:
> > >
> > > > Anna,
> > > >
> > > > Thanks for being diligent.
> > > >
> > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and
> size
> > > > fields to RecordMetadata and ConsumerRecord instead of exposing
> > metadata
> > > > piecemeal in the interceptor APIs.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner 
> > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > The KIP wiki page is now up-to-date with the scope we have agreed
> on:
> > > > > Producer and Consumer Interceptors with a minimal set of mutable
> API
> > > that
> > > > > are not dependent on producer and consumer internal implementation.
> > > > >
> > > > > I have few more API details that I would like to bring attention to
> > > > or/and
> > > > > discuss:
> > > > >
> > > > > 1. Handling exceptions
> > > > >
> > > > > Exceptions can provide an additional level of control. For example,
> > we
> > > > can
> > > > > filter messages on consumer side or stop messages on producer if
> they
> > > > don’t
> > > > > have the right field.
> > > > >
> > > > > I see two options:
> > > > > 1.1. For callbacks that can mutate records (onSend and onConsume),
> > > > > propagate exceptions through the original calls
> (KafkaProducer.send()
> > > and
> > > > > KafkaConsumer.poll() respectively). For other callbacks, catch
> > > exception,
> > > > > log, and ignore.
> > > > > 1.2. Catch exceptions from all the interceptor callbacks and
> ignore.
> > > > >
> > > > > The issue with 1.1. is that it effectively changes
> > KafkaProducer.send()
> > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-28 Thread Anna Povzner
On a second thought, yes, I think we should expose record size that
represents application bytes. This is Becket's option #1.

I updated the KIP wiki with new fields in RecordMetadata and ConsumerRecord.

I would like to start a voting thread tomorrow if there are no objections
or more things to discuss.

Thanks,
Anna



On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner  wrote:

> Regarding record size as bytes sent over the wire, my concern is that it
> is almost impossible to calculate per-record. We could do as: 1) compressed
> bytes / number of records in a compressed message, as Todd mentioned; or 2)
> or same as #1 but take it proportional to uncompressed record size vs.
> total uncompressed size of records. All of these calculations give us an
> estimate. So maybe record size as bytes sent over the wire is not a
> per-record metadata, but rather per topic/partition measure that is better
> to be exposed through metrics?
>
>
> On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino  wrote:
>
>> It may be difficult (or nearly impossible) to get actual compressed bytes
>> for a message from a compressed batch, but I do think it’s useful
>> information to have available for the very reason noted, bandwidth
>> consumed. Does it make sense to have an interceptor at the batch level
>> that
>> can provide this? The other option is to estimate it (such as making an
>> assumption that the messages in a batch are equal in size, which is not
>> necessarily true), which is probably not the right answer.
>>
>> -Todd
>>
>>
>> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner  wrote:
>>
>> > Hi Becket,
>> >
>> > It will be up to the interceptor to implement their audit or monitoring
>> > strategy. I would also imagine there is more than one good way to do
>> audit.
>> > So, I agree that some of the interceptors may not use CRC, and we will
>> not
>> > require it. The question is now whether intercepting CRCs is needed. I
>> > think they are very useful for monitoring and audit, because CRC
>> provides
>> > an a easy way to get a summary of a message, rather than using message
>> > bytes or key/value objects.
>> >
>> > Regarding record size, I agree that bandwidth example was not a good
>> one. I
>> > think it would be hard to get actual bytes sent over the wire (your #2),
>> > since multiple records get compressed together and we would need to
>> decide
>> > which bytes to account to which record. So I am inclined to only do your
>> > #1. However, it still makes more sense to me just to return record size
>> > including the header, since this is the actual record size.
>> >
>> > Thanks,
>> > Anna
>> >
>> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin 
>> wrote:
>> >
>> > > Anna,
>> > >
>> > > Using CRC to do end2end auditing might be very costly because you will
>> > need
>> > > to collect all the CRC from both producer and consumer. And it is
>> based
>> > on
>> > > the assumption that broker does not modify the record.
>> > > Can you shed some idea on how end to end auditing will be using the
>> CRC
>> > > before we decide to expose such low level detail to the end user? It
>> > would
>> > > also be helpful if you can compare it with something like sequence
>> number
>> > > based auditing.
>> > >
>> > > About the record size, one thing worth notice is that the size of
>> Record
>> > is
>> > > not the actual bytes sent over the wire if we use compression. So that
>> > does
>> > > not really tell user how much bandwidth they are using. Personally I
>> > think
>> > > two kinds of size may be useful.
>> > > 1. The record size after serialization, i.e. application bytes. (The
>> > > uncompressed record size can be easily derived as well)
>> > > 2. The actual bytes sent over the wire.
>> > > We can get (1) easily, but (2) is difficult to get at Record level
>> when
>> > we
>> > > use compression.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner 
>> > wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > The use-case for CRC is end-to-end audit, rather than checking
>> whether
>> > a
>> > > > single message is corrupt or not.
>> > > >
>> > > > Regarding record size, I was thinking to extract record size from
>> > Record.
>> > > > That will include header overhead as well. I think total record size
>> > will
>> > > > tell users how much bandwidth their messages take. Since header is
>> > > > relatively small and constant, users also will get an idea of their
>> > > > key/value sizes.
>> > > >
>> > > > Thanks,
>> > > > Anna
>> > > >
>> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin 
>> > > wrote:
>> > > >
>> > > > > I am +1 on #1.2 and #3.
>> > > > >
>> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there
>> > any
>> > > > > specific use case? Currently we validate messages by calling
>> > > > ensureValid()
>> > > > > to verify 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-27 Thread Anna Povzner
Hi All,

The KIP wiki page is now up-to-date with the scope we have agreed on:
Producer and Consumer Interceptors with a minimal set of mutable API that
are not dependent on producer and consumer internal implementation.

I have few more API details that I would like to bring attention to or/and
discuss:

1. Handling exceptions

Exceptions can provide an additional level of control. For example, we can
filter messages on consumer side or stop messages on producer if they don’t
have the right field.

I see two options:
1.1. For callbacks that can mutate records (onSend and onConsume),
propagate exceptions through the original calls (KafkaProducer.send() and
KafkaConsumer.poll() respectively). For other callbacks, catch exception,
log, and ignore.
1.2. Catch exceptions from all the interceptor callbacks and ignore.

The issue with 1.1. is that it effectively changes KafkaProducer.send() and
KafkaConsumer.poll() API, since now they may throw exceptions that are not
documented in KafkaProducer/Consumer API. Another option is to allow to
propagate some exceptions, and ignore others.

I think our use-cases do not require propagating exceptions. So, I propose
option 1.2. Unless someone has suggestion/use-cases for propagating
exceptions. Please let me know.

2. Intercepting record CRC and record size

Since we decided not to add any intermediate callbacks (such as onEnqueue
or onReceive) to interceptors, I think it is still valuable to intercept
record CRC and record size in bytes for monitoring and audit use-cases.

I propose to add checksum and size fields to RecordMetadata and
ConsumerRecord. Another option would be to add them as parameters in
onAcknowledgement() and onConsume() callbacks.

3. Callbacks that allow to modify records look as follows:
ProducerRecord onSend(ProducerRecord record);
ConsumerRecords onConsume(ConsumerRecords records);

This means that interceptors can potentially modify topic/partition in
ProducerRecord and topic/partition/offset in ConsumerRecord. I propose that
it is up to the interceptor implementation to ensure that topic/partition,
etc is correct. KafkaProducer.send() will use topic, partition, key, and
value from ProducerRecord returned from the onSend(). Similarly,
ConsumerRecords returned from KafkaConsumer.poll() would be the ones
returned from the interceptor.



Please let me know if you have any suggestions or objections to the above.

Thanks,
Anna


On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner  wrote:

> Hi Mayuresh,
>
> I see why you would want to check for messages left in the
> RecordAccumulator. However, I don't think this will completely solve the
> problem. Messages could be in-flight somewhere else, like in the socket, or
> there maybe in-flight messages on the consumer side of the MirrorMaker. So,
> if we go the route of checking whether there are any in-flight messages for
> topic deletion use-case, maybe it is better count them with onSend() and
> onAcknowledge() -- whether all messages sent were acknowledged. I also
> think that it would be better to solve this without interceptors, such as
> fix error handling in this scenario. However, I do not have any good
> proposal right now, so these are just general thoughts.
>
> Thanks,
> Anna
>
>
>
> On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Calling producer.flush(), flushes all the data. So this is OK. But when
>> you
>> are running Mirror maker, I am not sure there is a way to flush() from
>> outside.
>>
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin 
>> wrote:
>>
>> > Mayuresh,
>> >
>> > Regarding your use case about mirror maker. Is it good enough as long
>> as we
>> > know there is no message for the topic in the producer anymore? If that
>> is
>> > the case, call producer.flush() is sufficient.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
>> > gharatmayures...@gmail.com
>> > > wrote:
>> >
>> > > Hi Anna,
>> > >
>> > > Thanks a lot for summarizing the discussion on this kip.
>> > >
>> > > It LGTM.
>> > > This is really nice :
>> > > We decided not to add any callbacks to producer and consumer
>> > > interceptors that will depend on internal implementation as part of
>> this
>> > > KIP.
>> > > *However, it is possible to add them later as part of another KIP if
>> > there
>> > > are good use-cases.*
>> > >
>> > > Do you agree with the use case I explained earlier for knowing the
>> number
>> > > of records left in the RecordAccumulator for a particular topic. It
>> might
>> > > be orthogonal to this KIP, but will be helpful. What do you think?
>> > >
>> > > Thanks,
>> > >
>> > > Mayuresh
>> > >
>> > >
>> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino 
>> wrote:
>> > >
>> > > > This looks good. As noted, having one mutable interceptor on each
>> side
>> > > > allows for the use 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-27 Thread Anna Povzner
Hi Mayuresh,

I see why you would want to check for messages left in the
RecordAccumulator. However, I don't think this will completely solve the
problem. Messages could be in-flight somewhere else, like in the socket, or
there maybe in-flight messages on the consumer side of the MirrorMaker. So,
if we go the route of checking whether there are any in-flight messages for
topic deletion use-case, maybe it is better count them with onSend() and
onAcknowledge() -- whether all messages sent were acknowledged. I also
think that it would be better to solve this without interceptors, such as
fix error handling in this scenario. However, I do not have any good
proposal right now, so these are just general thoughts.

Thanks,
Anna



On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Calling producer.flush(), flushes all the data. So this is OK. But when you
> are running Mirror maker, I am not sure there is a way to flush() from
> outside.
>
>
> Thanks,
>
> Mayuresh
>
> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin  wrote:
>
> > Mayuresh,
> >
> > Regarding your use case about mirror maker. Is it good enough as long as
> we
> > know there is no message for the topic in the producer anymore? If that
> is
> > the case, call producer.flush() is sufficient.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > Hi Anna,
> > >
> > > Thanks a lot for summarizing the discussion on this kip.
> > >
> > > It LGTM.
> > > This is really nice :
> > > We decided not to add any callbacks to producer and consumer
> > > interceptors that will depend on internal implementation as part of
> this
> > > KIP.
> > > *However, it is possible to add them later as part of another KIP if
> > there
> > > are good use-cases.*
> > >
> > > Do you agree with the use case I explained earlier for knowing the
> number
> > > of records left in the RecordAccumulator for a particular topic. It
> might
> > > be orthogonal to this KIP, but will be helpful. What do you think?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > >
> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino 
> wrote:
> > >
> > > > This looks good. As noted, having one mutable interceptor on each
> side
> > > > allows for the use cases we can envision right now, and I think
> that’s
> > > > going to provide a great deal of opportunity for implementing things
> > like
> > > > audit, especially within a multi-tenant environment. Looking forward
> to
> > > > getting this available in the clients.
> > > >
> > > > Thanks!
> > > >
> > > > -Todd
> > > >
> > > >
> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner 
> > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > Here is meeting notes from today’s KIP meeting:
> > > > >
> > > > > 1. We agreed to keep the scope of this KIP to be producer and
> > consumer
> > > > > interceptors only. Broker-side interceptor will be added later as a
> > > > > separate KIP. The reasons were already mentioned in this thread,
> but
> > > the
> > > > > summary is:
> > > > >  * Broker interceptor is riskier and requires careful consideration
> > > about
> > > > > overheads, whether to intercept leaders vs. leaders/replicas, what
> to
> > > do
> > > > on
> > > > > leader failover and so on.
> > > > >  * Broker interceptors increase monitoring resolution, but not
> > > including
> > > > it
> > > > > in this KIP does not reduce usefulness of producer and consumer
> > > > > interceptors that enable end-to-end monitoring
> > > > >
> > > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor
> > > > callbacks
> > > > > to minimal set of mutable API that are not dependent on producer
> and
> > > > > consumer internal implementation.
> > > > >
> > > > > ProducerInterceptor:
> > > > > *ProducerRecord onSend(ProducerRecord record);*
> > > > > *void onAcknowledgement(RecordMetadata metadata, Exception
> > exception);*
> > > > >
> > > > > ConsumerInterceptor:
> > > > > *ConsumerRecords onConsume(ConsumerRecords records);*
> > > > > *void onCommit(Map offsets);*
> > > > >
> > > > > We will allow interceptors to modify ProducerRecord on producer
> side,
> > > and
> > > > > modify ConsumerRecords on consumer side. This will support
> end-to-end
> > > > > monitoring and auditing and support the ability to add metadata
> for a
> > > > > message. This will support Todd’s Auditing and Routing use-cases.
> > > > >
> > > > > We did not find any use-case for modifying records in onConsume()
> > > > callback,
> > > > > but decided to enable modification of consumer records for symmetry
> > > with
> > > > > onSend().
> > > > >
> > > > > 3. We agreed to ensure compatibility when/if we add new methods to
> > > > > ProducerInterceptor and ConsumerInterceptor by using default
> methods
> > > with
> > > > > an empty 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-27 Thread Neha Narkhede
Anna,

Thanks for being diligent.

+1 on #1.2 and sounds good on #3. I recommend adding checksum and size
fields to RecordMetadata and ConsumerRecord instead of exposing metadata
piecemeal in the interceptor APIs.

Thanks,
Neha

On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner  wrote:

> Hi All,
>
> The KIP wiki page is now up-to-date with the scope we have agreed on:
> Producer and Consumer Interceptors with a minimal set of mutable API that
> are not dependent on producer and consumer internal implementation.
>
> I have few more API details that I would like to bring attention to or/and
> discuss:
>
> 1. Handling exceptions
>
> Exceptions can provide an additional level of control. For example, we can
> filter messages on consumer side or stop messages on producer if they don’t
> have the right field.
>
> I see two options:
> 1.1. For callbacks that can mutate records (onSend and onConsume),
> propagate exceptions through the original calls (KafkaProducer.send() and
> KafkaConsumer.poll() respectively). For other callbacks, catch exception,
> log, and ignore.
> 1.2. Catch exceptions from all the interceptor callbacks and ignore.
>
> The issue with 1.1. is that it effectively changes KafkaProducer.send() and
> KafkaConsumer.poll() API, since now they may throw exceptions that are not
> documented in KafkaProducer/Consumer API. Another option is to allow to
> propagate some exceptions, and ignore others.
>
> I think our use-cases do not require propagating exceptions. So, I propose
> option 1.2. Unless someone has suggestion/use-cases for propagating
> exceptions. Please let me know.
>
> 2. Intercepting record CRC and record size
>
> Since we decided not to add any intermediate callbacks (such as onEnqueue
> or onReceive) to interceptors, I think it is still valuable to intercept
> record CRC and record size in bytes for monitoring and audit use-cases.
>
> I propose to add checksum and size fields to RecordMetadata and
> ConsumerRecord. Another option would be to add them as parameters in
> onAcknowledgement() and onConsume() callbacks.
>
> 3. Callbacks that allow to modify records look as follows:
> ProducerRecord onSend(ProducerRecord record);
> ConsumerRecords onConsume(ConsumerRecords records);
>
> This means that interceptors can potentially modify topic/partition in
> ProducerRecord and topic/partition/offset in ConsumerRecord. I propose that
> it is up to the interceptor implementation to ensure that topic/partition,
> etc is correct. KafkaProducer.send() will use topic, partition, key, and
> value from ProducerRecord returned from the onSend(). Similarly,
> ConsumerRecords returned from KafkaConsumer.poll() would be the ones
> returned from the interceptor.
>
>
>
> Please let me know if you have any suggestions or objections to the above.
>
> Thanks,
> Anna
>
>
> On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner  wrote:
>
> > Hi Mayuresh,
> >
> > I see why you would want to check for messages left in the
> > RecordAccumulator. However, I don't think this will completely solve the
> > problem. Messages could be in-flight somewhere else, like in the socket,
> or
> > there maybe in-flight messages on the consumer side of the MirrorMaker.
> So,
> > if we go the route of checking whether there are any in-flight messages
> for
> > topic deletion use-case, maybe it is better count them with onSend() and
> > onAcknowledge() -- whether all messages sent were acknowledged. I also
> > think that it would be better to solve this without interceptors, such as
> > fix error handling in this scenario. However, I do not have any good
> > proposal right now, so these are just general thoughts.
> >
> > Thanks,
> > Anna
> >
> >
> >
> > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> >> Calling producer.flush(), flushes all the data. So this is OK. But when
> >> you
> >> are running Mirror maker, I am not sure there is a way to flush() from
> >> outside.
> >>
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin 
> >> wrote:
> >>
> >> > Mayuresh,
> >> >
> >> > Regarding your use case about mirror maker. Is it good enough as long
> >> as we
> >> > know there is no message for the topic in the producer anymore? If
> that
> >> is
> >> > the case, call producer.flush() is sufficient.
> >> >
> >> > Thanks,
> >> >
> >> > Jiangjie (Becket) Qin
> >> >
> >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> >> > gharatmayures...@gmail.com
> >> > > wrote:
> >> >
> >> > > Hi Anna,
> >> > >
> >> > > Thanks a lot for summarizing the discussion on this kip.
> >> > >
> >> > > It LGTM.
> >> > > This is really nice :
> >> > > We decided not to add any callbacks to producer and consumer
> >> > > interceptors that will depend on internal implementation as part of
> >> this
> >> > > KIP.
> >> > > *However, it is possible to add them later as part of another KIP if
> >> > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-27 Thread Becket Qin
Mayuresh,

Regarding your use case about mirror maker. Is it good enough as long as we
know there is no message for the topic in the producer anymore? If that is
the case, call producer.flush() is sufficient.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat  wrote:

> Hi Anna,
>
> Thanks a lot for summarizing the discussion on this kip.
>
> It LGTM.
> This is really nice :
> We decided not to add any callbacks to producer and consumer
> interceptors that will depend on internal implementation as part of this
> KIP.
> *However, it is possible to add them later as part of another KIP if there
> are good use-cases.*
>
> Do you agree with the use case I explained earlier for knowing the number
> of records left in the RecordAccumulator for a particular topic. It might
> be orthogonal to this KIP, but will be helpful. What do you think?
>
> Thanks,
>
> Mayuresh
>
>
> On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino  wrote:
>
> > This looks good. As noted, having one mutable interceptor on each side
> > allows for the use cases we can envision right now, and I think that’s
> > going to provide a great deal of opportunity for implementing things like
> > audit, especially within a multi-tenant environment. Looking forward to
> > getting this available in the clients.
> >
> > Thanks!
> >
> > -Todd
> >
> >
> > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner  wrote:
> >
> > > Hi All,
> > >
> > > Here is meeting notes from today’s KIP meeting:
> > >
> > > 1. We agreed to keep the scope of this KIP to be producer and consumer
> > > interceptors only. Broker-side interceptor will be added later as a
> > > separate KIP. The reasons were already mentioned in this thread, but
> the
> > > summary is:
> > >  * Broker interceptor is riskier and requires careful consideration
> about
> > > overheads, whether to intercept leaders vs. leaders/replicas, what to
> do
> > on
> > > leader failover and so on.
> > >  * Broker interceptors increase monitoring resolution, but not
> including
> > it
> > > in this KIP does not reduce usefulness of producer and consumer
> > > interceptors that enable end-to-end monitoring
> > >
> > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor
> > callbacks
> > > to minimal set of mutable API that are not dependent on producer and
> > > consumer internal implementation.
> > >
> > > ProducerInterceptor:
> > > *ProducerRecord onSend(ProducerRecord record);*
> > > *void onAcknowledgement(RecordMetadata metadata, Exception exception);*
> > >
> > > ConsumerInterceptor:
> > > *ConsumerRecords onConsume(ConsumerRecords records);*
> > > *void onCommit(Map offsets);*
> > >
> > > We will allow interceptors to modify ProducerRecord on producer side,
> and
> > > modify ConsumerRecords on consumer side. This will support end-to-end
> > > monitoring and auditing and support the ability to add metadata for a
> > > message. This will support Todd’s Auditing and Routing use-cases.
> > >
> > > We did not find any use-case for modifying records in onConsume()
> > callback,
> > > but decided to enable modification of consumer records for symmetry
> with
> > > onSend().
> > >
> > > 3. We agreed to ensure compatibility when/if we add new methods to
> > > ProducerInterceptor and ConsumerInterceptor by using default methods
> with
> > > an empty implementation. Ok to assume Java 8. (This is Ismael’s method
> > #2).
> > >
> > > 4. We decided not to add any callbacks to producer and consumer
> > > interceptors that will depend on internal implementation as part of
> this
> > > KIP. However, it is possible to add them later as part of another KIP
> if
> > > there are good use-cases.
> > >
> > > *Reasoning.* We did not have concrete use-cases that justified more
> > methods
> > > at this point. Some of the use-cases were for more fine-grain latency
> > > collection, which could be done with Kafka Metrics. Another use-case
> was
> > > encryption. However, there are several design options for encryption.
> One
> > > is to do per-record encryption which would require adding
> > > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive().
> One
> > > could argue that in that case encryption could be done by adding a
> custom
> > > serializer/deserializer. Another option is to do encryption after
> message
> > > gets compressed, but there are issues that arise regarding broker doing
> > > re-compression. We decided that it is better to have that discussion
> in a
> > > separate KIP and decide that this is something we want to do with
> > > interceptors or by other means.
> > >
> > >
> > > Todd, Mayuresh and others who missed the KIP meeting, please let me
> know
> > > your thoughts on the scope we agreed on during the meeting.
> > >
> > > I will update the KIP proposal with the current decision by end of
> today.
> > >
> > > Thanks,
> > > Anna
> > >
> > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-27 Thread Mayuresh Gharat
Calling producer.flush(), flushes all the data. So this is OK. But when you
are running Mirror maker, I am not sure there is a way to flush() from
outside.


Thanks,

Mayuresh

On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin  wrote:

> Mayuresh,
>
> Regarding your use case about mirror maker. Is it good enough as long as we
> know there is no message for the topic in the producer anymore? If that is
> the case, call producer.flush() is sufficient.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Anna,
> >
> > Thanks a lot for summarizing the discussion on this kip.
> >
> > It LGTM.
> > This is really nice :
> > We decided not to add any callbacks to producer and consumer
> > interceptors that will depend on internal implementation as part of this
> > KIP.
> > *However, it is possible to add them later as part of another KIP if
> there
> > are good use-cases.*
> >
> > Do you agree with the use case I explained earlier for knowing the number
> > of records left in the RecordAccumulator for a particular topic. It might
> > be orthogonal to this KIP, but will be helpful. What do you think?
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino  wrote:
> >
> > > This looks good. As noted, having one mutable interceptor on each side
> > > allows for the use cases we can envision right now, and I think that’s
> > > going to provide a great deal of opportunity for implementing things
> like
> > > audit, especially within a multi-tenant environment. Looking forward to
> > > getting this available in the clients.
> > >
> > > Thanks!
> > >
> > > -Todd
> > >
> > >
> > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner 
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > Here is meeting notes from today’s KIP meeting:
> > > >
> > > > 1. We agreed to keep the scope of this KIP to be producer and
> consumer
> > > > interceptors only. Broker-side interceptor will be added later as a
> > > > separate KIP. The reasons were already mentioned in this thread, but
> > the
> > > > summary is:
> > > >  * Broker interceptor is riskier and requires careful consideration
> > about
> > > > overheads, whether to intercept leaders vs. leaders/replicas, what to
> > do
> > > on
> > > > leader failover and so on.
> > > >  * Broker interceptors increase monitoring resolution, but not
> > including
> > > it
> > > > in this KIP does not reduce usefulness of producer and consumer
> > > > interceptors that enable end-to-end monitoring
> > > >
> > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor
> > > callbacks
> > > > to minimal set of mutable API that are not dependent on producer and
> > > > consumer internal implementation.
> > > >
> > > > ProducerInterceptor:
> > > > *ProducerRecord onSend(ProducerRecord record);*
> > > > *void onAcknowledgement(RecordMetadata metadata, Exception
> exception);*
> > > >
> > > > ConsumerInterceptor:
> > > > *ConsumerRecords onConsume(ConsumerRecords records);*
> > > > *void onCommit(Map offsets);*
> > > >
> > > > We will allow interceptors to modify ProducerRecord on producer side,
> > and
> > > > modify ConsumerRecords on consumer side. This will support end-to-end
> > > > monitoring and auditing and support the ability to add metadata for a
> > > > message. This will support Todd’s Auditing and Routing use-cases.
> > > >
> > > > We did not find any use-case for modifying records in onConsume()
> > > callback,
> > > > but decided to enable modification of consumer records for symmetry
> > with
> > > > onSend().
> > > >
> > > > 3. We agreed to ensure compatibility when/if we add new methods to
> > > > ProducerInterceptor and ConsumerInterceptor by using default methods
> > with
> > > > an empty implementation. Ok to assume Java 8. (This is Ismael’s
> method
> > > #2).
> > > >
> > > > 4. We decided not to add any callbacks to producer and consumer
> > > > interceptors that will depend on internal implementation as part of
> > this
> > > > KIP. However, it is possible to add them later as part of another KIP
> > if
> > > > there are good use-cases.
> > > >
> > > > *Reasoning.* We did not have concrete use-cases that justified more
> > > methods
> > > > at this point. Some of the use-cases were for more fine-grain latency
> > > > collection, which could be done with Kafka Metrics. Another use-case
> > was
> > > > encryption. However, there are several design options for encryption.
> > One
> > > > is to do per-record encryption which would require adding
> > > > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive().
> > One
> > > > could argue that in that case encryption could be done by adding a
> > custom
> > > > serializer/deserializer. Another option is to do encryption after
> > message
> > > > gets compressed, but there are issues that 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Ismael Juma
Hi Anna and Neha,

I think it makes a lot of sense to try and keep the interface lean and to
add more methods later when/if there is a need. What is the current
thinking with regards to compatibility when/if we add new methods? A few
options come to mind:

1. Change the interface to an abstract class with empty implementations for
all the methods. This means that the path to adding new methods is clear.
2. Hope we have moved to Java 8 by the time we need to add new methods and
use default methods with an empty implementation for any new method (and
potentially make existing methods default methods too at that point for
consistency)
3. Introduce a new interface that inherits from the existing Interceptor
interface when we need to add new methods.

Option 1 is the easiest and it also means that interceptor users only need
to override the methods that they are interested (more useful if the number
of methods grows). The downside is that interceptor implementations cannot
inherit from another class (a straightforward workaround is to make the
interceptor a forwarder that calls another class). Also, our existing
callbacks are interfaces, so seems a bit inconsistent.

Option 2 may be the most appealing one as both users and ourselves retain
flexibility. The main downside is that it relies on us moving to Java 8,
which may be more than a year away potentially (if we support the last 2
Java releases).

Thoughts?

Ismael

On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede  wrote:

> Anna,
>
> I'm also in favor of including just the APIs for which we have a clear use
> case. If more use cases for finer monitoring show up in the future, we can
> always update the interface. Would you please highlight in the KIP the APIs
> that you think we have an immediate use for?
>
> Joel,
>
> Broker-side monitoring makes a lot of sense in the long term though I don't
> think it is a requirement for end-to-end monitoring. With the producer and
> consumer interceptors, you have the ability to get full
> publish-to-subscribe end-to-end monitoring. The broker interceptor
> certainly improves the resolution of monitoring but it is also a riskier
> change. I prefer an incremental approach over a big-bang and recommend
> taking baby-steps. Let's first make sure the producer/consumer interceptors
> are successful. And then come back and add the broker interceptor
> carefully.
>
> Having said that, it would be great to understand your proposal for the
> broker interceptor independently. We can either add an interceptor
> on-append or on-commit. If people want to use this for monitoring, then
> possibly on-commit might be more useful?
>
> Thanks,
> Neha
>
> On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps  wrote:
>
> > Hey Joel,
> >
> > What is the interface you are thinking of? Something like this:
> > onAppend(String topic, int partition, Records records, long time)
> > ?
> >
> > One challenge right now is that we are still using the old
> > Message/MessageSet classes on the broker which I'm not sure if we'd want
> to
> > support over the long haul but it might be okay just to create the
> records
> > instance for this interface.
> >
> > -Jay
> >
> > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy 
> wrote:
> >
> > > I'm definitely in favor of having such hooks in the produce/consume
> > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> > > pretty much how it was:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > i.e., we had something similar to the interceptor proposal for various
> > > stages of the producer request. The producer provided call-backs for
> > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> > > LinkedIn we in fact did auditing within these call-backs (and not
> > > explicitly in the wrapper). Over time and with 0.8 we moved that out to
> > the
> > > wrapper libraries.
> > >
> > > On a side-note while audit and other monitoring can be done internally
> > in a
> > > convenient way I think it should be clarified that having a wrapper is
> in
> > > general not a bad idea and I would even consider it to be a
> > best-practice.
> > > Even with 0.7 we still had a wrapper library and that API has largely
> > > stayed the same and has helped protect against (sometimes backwards
> > > incompatible) changes in open source.
> > >
> > > While we are on this topic I have one comment and Anna, you may have
> > > already considered this but I don't see mention of it in the KIP:
> > >
> > > Add a custom message interceptor/validator on the broker on message
> > > arrival.
> > >
> > > We decompress and do basic validation of messages on arrival. I think
> > there
> > > is value in supporting custom validation and expand it to support
> custom
> > > on-arrival processing. Here is a specific use-case I have in mind. The
> > blog
> > > that James referenced 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Todd Palino
Finally got a chance to take a look at this. I won’t be able to make the
KIP meeting due to a conflict.

I’m somewhat disappointed in this proposal. I think that the explicit
exclusion of modification of the messages is short-sighted, and not
accounting for it now is going to bite us later. Jay, aren’t you the one
railing against public interfaces and how difficult they are to work with
when you don’t get them right? The “simple” change to one of these
interfaces to make it able to return a record is going to be a significant
change and is going to require all clients to rewrite their interceptors.
If we’re not willing to put the time to think through manipulation now,
then this KIP should be shelved until we are. Implementing something
halfway is going to be worse than taking a little longer. In addition, I
don’t believe that manipulation requires anything more than interceptors to
receive the full record, and then to return it.

There are 3 use case I can think of right now without any deep discussion
that can make use of interceptors with modification:

1. Auditing. The ability to add metadata to a message for auditing is
critical. Hostname, service name, timestamps, etc. are all pieces of data
that can be used on the other side of the pipeline to categorize messages,
determine loss and transport time, and pin down issues. You may say that
these things can just be part of the message schema, but anyone who has
worked with a multi-user data system (especially those who have been
involved with LinkedIn) know how difficult it is to maintain consistent
message schemas and to get other people to put in fields for your use.

2. Encryption. This is probably the most obvious case for record
manipulation on both sides. The ability to tie in end to end encryption is
important for data that requires external compliance (PCI, HIPAA, etc.).

3. Routing. By being able to add a bit of information about the source or
destination of a message to the metadata, you can easily construct an
intelligent mirror maker that can prevent loops. This has the opportunity
to result in significant operational savings, as you can get rid of the
need for tiered clusters in order to prevent loops in mirroring messages.

All three of these share the feature that they add metadata to messages.
With the pushback on having arbitrary metadata as an “envelope” to the
message, this is a way to provide it and make it the responsibility of the
client, and not the Kafka broker and system itself.

-Todd



On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma  wrote:

> Hi Anna and Neha,
>
> I think it makes a lot of sense to try and keep the interface lean and to
> add more methods later when/if there is a need. What is the current
> thinking with regards to compatibility when/if we add new methods? A few
> options come to mind:
>
> 1. Change the interface to an abstract class with empty implementations for
> all the methods. This means that the path to adding new methods is clear.
> 2. Hope we have moved to Java 8 by the time we need to add new methods and
> use default methods with an empty implementation for any new method (and
> potentially make existing methods default methods too at that point for
> consistency)
> 3. Introduce a new interface that inherits from the existing Interceptor
> interface when we need to add new methods.
>
> Option 1 is the easiest and it also means that interceptor users only need
> to override the methods that they are interested (more useful if the number
> of methods grows). The downside is that interceptor implementations cannot
> inherit from another class (a straightforward workaround is to make the
> interceptor a forwarder that calls another class). Also, our existing
> callbacks are interfaces, so seems a bit inconsistent.
>
> Option 2 may be the most appealing one as both users and ourselves retain
> flexibility. The main downside is that it relies on us moving to Java 8,
> which may be more than a year away potentially (if we support the last 2
> Java releases).
>
> Thoughts?
>
> Ismael
>
> On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede  wrote:
>
> > Anna,
> >
> > I'm also in favor of including just the APIs for which we have a clear
> use
> > case. If more use cases for finer monitoring show up in the future, we
> can
> > always update the interface. Would you please highlight in the KIP the
> APIs
> > that you think we have an immediate use for?
> >
> > Joel,
> >
> > Broker-side monitoring makes a lot of sense in the long term though I
> don't
> > think it is a requirement for end-to-end monitoring. With the producer
> and
> > consumer interceptors, you have the ability to get full
> > publish-to-subscribe end-to-end monitoring. The broker interceptor
> > certainly improves the resolution of monitoring but it is also a riskier
> > change. I prefer an incremental approach over a big-bang and recommend
> > taking baby-steps. Let's first make sure the producer/consumer
> 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Anna Povzner
Thanks Ismael and Todd for your feedback!

I agree about coming up with lean, but useful interfaces that will be easy
to extend later.

When we discuss the minimal set of producer and consumer interceptor API in
today’s KIP meeting (discussion item #2 in my previous email), lets compare
two options:

*1. Minimal set of immutable API for producer and consumer interceptors*

ProducerInterceptor:
public void onSend(ProducerRecord record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);

ConsumerInterceptor:
public void onConsume(ConsumerRecords records);
public void onCommit(Map offsets);

Use-cases:
— end-to-end monitoring; custom tracing and logging


*2. Minimal set of mutable API for producer and consumer interceptors*

ProducerInterceptor:
ProducerRecord onSend(ProducerRecord record);
void onAcknowledgement(RecordMetadata metadata, Exception exception);

ConsumerInterceptor:
void onConsume(ConsumerRecords records);
void onCommit(Map offsets);

Additional use-cases to #1:
— Ability to add metadata to a message or fill in standard fields for audit
and routing.

Implications
— Partition assignment will be done based on modified key/value instead of
original key/value. If key/value transformation is not consistent (same key
and value does not mutate to the same, but modified, key/value), then log
compaction would not work. However, audit and routing use-cases from Todd
will likely do consistent transformation.


*Additional callbacks (discussion item #3 in my previous email):*

If we want to support encryption, we would want to be able to modify
serialized key/value, rather than key and value objects. This will add the
following API to producer and consumer interceptors:

ProducerInterceptor:
SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord
record, SerializedKeyValue serializedKeyValue);

ConsumerInterceptor:
SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue
serializedKeyValue);


I am leaning towards implementing the minimal set of immutable or mutable
interfaces, making sure that we have a compatibility plan that allows us to
add more callbacks in the future (per Ismael comment), and add more APIs
later. E.g., for encryption use-case, there could be an argument in doing
encryption after message compression vs. per-record encryption that could
be done using the above additional API. There is also more implications for
every API that modifies records: modifying serialized key/value will again
impact partition assignment (we will likely do that after partition
assignment), which may impact log compaction and mirror maker partitioning.


Thanks,
Anna

On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino  wrote:

> Finally got a chance to take a look at this. I won’t be able to make the
> KIP meeting due to a conflict.
>
> I’m somewhat disappointed in this proposal. I think that the explicit
> exclusion of modification of the messages is short-sighted, and not
> accounting for it now is going to bite us later. Jay, aren’t you the one
> railing against public interfaces and how difficult they are to work with
> when you don’t get them right? The “simple” change to one of these
> interfaces to make it able to return a record is going to be a significant
> change and is going to require all clients to rewrite their interceptors.
> If we’re not willing to put the time to think through manipulation now,
> then this KIP should be shelved until we are. Implementing something
> halfway is going to be worse than taking a little longer. In addition, I
> don’t believe that manipulation requires anything more than interceptors to
> receive the full record, and then to return it.
>
> There are 3 use case I can think of right now without any deep discussion
> that can make use of interceptors with modification:
>
> 1. Auditing. The ability to add metadata to a message for auditing is
> critical. Hostname, service name, timestamps, etc. are all pieces of data
> that can be used on the other side of the pipeline to categorize messages,
> determine loss and transport time, and pin down issues. You may say that
> these things can just be part of the message schema, but anyone who has
> worked with a multi-user data system (especially those who have been
> involved with LinkedIn) know how difficult it is to maintain consistent
> message schemas and to get other people to put in fields for your use.
>
> 2. Encryption. This is probably the most obvious case for record
> manipulation on both sides. The ability to tie in end to end encryption is
> important for data that requires external compliance (PCI, HIPAA, etc.).
>
> 3. Routing. By being able to add a bit of information about the source or
> destination of a message to the metadata, you can easily construct an
> intelligent mirror maker that can prevent loops. This has the opportunity
> to result in 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Mayuresh Gharat
Hi,

I won't be able to make it to KIP hangout due to conflict.

Anna, here is the use case where knowing if there are messages in the
RecordAccumulator left to be sent to the kafka cluster for a topic is
useful.

1) Consider a pipeline :
A ---> Mirror-maker -> B

2) We have a topic T in cluster A mirrored to cluster B.

3) Now if we delete topic T in A and immediately proceed to delete the
topic in cluster B, some of the the Mirror-maker machines die because
atleast one of the batches in RecordAccumulator for topic T fail to be
produced to cluster B. We have seen this happening in our clusters.


If we know that there are no more messages left in the RecordAccumulator to
be produced to cluster B, we can safely delete the topic in cluster B
without disturbing the pipeline.

Thanks,

Mayuresh

On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner  wrote:

> Thanks Ismael and Todd for your feedback!
>
> I agree about coming up with lean, but useful interfaces that will be easy
> to extend later.
>
> When we discuss the minimal set of producer and consumer interceptor API in
> today’s KIP meeting (discussion item #2 in my previous email), lets compare
> two options:
>
> *1. Minimal set of immutable API for producer and consumer interceptors*
>
> ProducerInterceptor:
> public void onSend(ProducerRecord record);
> public void onAcknowledgement(RecordMetadata metadata, Exception
> exception);
>
> ConsumerInterceptor:
> public void onConsume(ConsumerRecords records);
> public void onCommit(Map offsets);
>
> Use-cases:
> — end-to-end monitoring; custom tracing and logging
>
>
> *2. Minimal set of mutable API for producer and consumer interceptors*
>
> ProducerInterceptor:
> ProducerRecord onSend(ProducerRecord record);
> void onAcknowledgement(RecordMetadata metadata, Exception exception);
>
> ConsumerInterceptor:
> void onConsume(ConsumerRecords records);
> void onCommit(Map offsets);
>
> Additional use-cases to #1:
> — Ability to add metadata to a message or fill in standard fields for audit
> and routing.
>
> Implications
> — Partition assignment will be done based on modified key/value instead of
> original key/value. If key/value transformation is not consistent (same key
> and value does not mutate to the same, but modified, key/value), then log
> compaction would not work. However, audit and routing use-cases from Todd
> will likely do consistent transformation.
>
>
> *Additional callbacks (discussion item #3 in my previous email):*
>
> If we want to support encryption, we would want to be able to modify
> serialized key/value, rather than key and value objects. This will add the
> following API to producer and consumer interceptors:
>
> ProducerInterceptor:
> SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord
> record, SerializedKeyValue serializedKeyValue);
>
> ConsumerInterceptor:
> SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue
> serializedKeyValue);
>
>
> I am leaning towards implementing the minimal set of immutable or mutable
> interfaces, making sure that we have a compatibility plan that allows us to
> add more callbacks in the future (per Ismael comment), and add more APIs
> later. E.g., for encryption use-case, there could be an argument in doing
> encryption after message compression vs. per-record encryption that could
> be done using the above additional API. There is also more implications for
> every API that modifies records: modifying serialized key/value will again
> impact partition assignment (we will likely do that after partition
> assignment), which may impact log compaction and mirror maker partitioning.
>
>
> Thanks,
> Anna
>
> On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino  wrote:
>
> > Finally got a chance to take a look at this. I won’t be able to make the
> > KIP meeting due to a conflict.
> >
> > I’m somewhat disappointed in this proposal. I think that the explicit
> > exclusion of modification of the messages is short-sighted, and not
> > accounting for it now is going to bite us later. Jay, aren’t you the one
> > railing against public interfaces and how difficult they are to work with
> > when you don’t get them right? The “simple” change to one of these
> > interfaces to make it able to return a record is going to be a
> significant
> > change and is going to require all clients to rewrite their interceptors.
> > If we’re not willing to put the time to think through manipulation now,
> > then this KIP should be shelved until we are. Implementing something
> > halfway is going to be worse than taking a little longer. In addition, I
> > don’t believe that manipulation requires anything more than interceptors
> to
> > receive the full record, and then to return it.
> >
> > There are 3 use case I can think of right now without any deep discussion
> > that can make use of interceptors with modification:
> >

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Anna Povzner
Hi All,

Here is meeting notes from today’s KIP meeting:

1. We agreed to keep the scope of this KIP to be producer and consumer
interceptors only. Broker-side interceptor will be added later as a
separate KIP. The reasons were already mentioned in this thread, but the
summary is:
 * Broker interceptor is riskier and requires careful consideration about
overheads, whether to intercept leaders vs. leaders/replicas, what to do on
leader failover and so on.
 * Broker interceptors increase monitoring resolution, but not including it
in this KIP does not reduce usefulness of producer and consumer
interceptors that enable end-to-end monitoring

2. We agreed to scope ProducerInterceptor and ConsumerInterceptor callbacks
to minimal set of mutable API that are not dependent on producer and
consumer internal implementation.

ProducerInterceptor:
*ProducerRecord onSend(ProducerRecord record);*
*void onAcknowledgement(RecordMetadata metadata, Exception exception);*

ConsumerInterceptor:
*ConsumerRecords onConsume(ConsumerRecords records);*
*void onCommit(Map offsets);*

We will allow interceptors to modify ProducerRecord on producer side, and
modify ConsumerRecords on consumer side. This will support end-to-end
monitoring and auditing and support the ability to add metadata for a
message. This will support Todd’s Auditing and Routing use-cases.

We did not find any use-case for modifying records in onConsume() callback,
but decided to enable modification of consumer records for symmetry with
onSend().

3. We agreed to ensure compatibility when/if we add new methods to
ProducerInterceptor and ConsumerInterceptor by using default methods with
an empty implementation. Ok to assume Java 8. (This is Ismael’s method #2).

4. We decided not to add any callbacks to producer and consumer
interceptors that will depend on internal implementation as part of this
KIP. However, it is possible to add them later as part of another KIP if
there are good use-cases.

*Reasoning.* We did not have concrete use-cases that justified more methods
at this point. Some of the use-cases were for more fine-grain latency
collection, which could be done with Kafka Metrics. Another use-case was
encryption. However, there are several design options for encryption. One
is to do per-record encryption which would require adding
ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One
could argue that in that case encryption could be done by adding a custom
serializer/deserializer. Another option is to do encryption after message
gets compressed, but there are issues that arise regarding broker doing
re-compression. We decided that it is better to have that discussion in a
separate KIP and decide that this is something we want to do with
interceptors or by other means.


Todd, Mayuresh and others who missed the KIP meeting, please let me know
your thoughts on the scope we agreed on during the meeting.

I will update the KIP proposal with the current decision by end of today.

Thanks,
Anna


On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Hi,
>
> I won't be able to make it to KIP hangout due to conflict.
>
> Anna, here is the use case where knowing if there are messages in the
> RecordAccumulator left to be sent to the kafka cluster for a topic is
> useful.
>
> 1) Consider a pipeline :
> A ---> Mirror-maker -> B
>
> 2) We have a topic T in cluster A mirrored to cluster B.
>
> 3) Now if we delete topic T in A and immediately proceed to delete the
> topic in cluster B, some of the the Mirror-maker machines die because
> atleast one of the batches in RecordAccumulator for topic T fail to be
> produced to cluster B. We have seen this happening in our clusters.
>
>
> If we know that there are no more messages left in the RecordAccumulator to
> be produced to cluster B, we can safely delete the topic in cluster B
> without disturbing the pipeline.
>
> Thanks,
>
> Mayuresh
>
> On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner  wrote:
>
> > Thanks Ismael and Todd for your feedback!
> >
> > I agree about coming up with lean, but useful interfaces that will be
> easy
> > to extend later.
> >
> > When we discuss the minimal set of producer and consumer interceptor API
> in
> > today’s KIP meeting (discussion item #2 in my previous email), lets
> compare
> > two options:
> >
> > *1. Minimal set of immutable API for producer and consumer interceptors*
> >
> > ProducerInterceptor:
> > public void onSend(ProducerRecord record);
> > public void onAcknowledgement(RecordMetadata metadata, Exception
> > exception);
> >
> > ConsumerInterceptor:
> > public void onConsume(ConsumerRecords records);
> > public void onCommit(Map offsets);
> >
> > Use-cases:
> > — end-to-end monitoring; custom tracing and logging
> >
> >
> > *2. Minimal set of mutable API for producer and consumer interceptors*
> >

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Todd Palino
This looks good. As noted, having one mutable interceptor on each side
allows for the use cases we can envision right now, and I think that’s
going to provide a great deal of opportunity for implementing things like
audit, especially within a multi-tenant environment. Looking forward to
getting this available in the clients.

Thanks!

-Todd


On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner  wrote:

> Hi All,
>
> Here is meeting notes from today’s KIP meeting:
>
> 1. We agreed to keep the scope of this KIP to be producer and consumer
> interceptors only. Broker-side interceptor will be added later as a
> separate KIP. The reasons were already mentioned in this thread, but the
> summary is:
>  * Broker interceptor is riskier and requires careful consideration about
> overheads, whether to intercept leaders vs. leaders/replicas, what to do on
> leader failover and so on.
>  * Broker interceptors increase monitoring resolution, but not including it
> in this KIP does not reduce usefulness of producer and consumer
> interceptors that enable end-to-end monitoring
>
> 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor callbacks
> to minimal set of mutable API that are not dependent on producer and
> consumer internal implementation.
>
> ProducerInterceptor:
> *ProducerRecord onSend(ProducerRecord record);*
> *void onAcknowledgement(RecordMetadata metadata, Exception exception);*
>
> ConsumerInterceptor:
> *ConsumerRecords onConsume(ConsumerRecords records);*
> *void onCommit(Map offsets);*
>
> We will allow interceptors to modify ProducerRecord on producer side, and
> modify ConsumerRecords on consumer side. This will support end-to-end
> monitoring and auditing and support the ability to add metadata for a
> message. This will support Todd’s Auditing and Routing use-cases.
>
> We did not find any use-case for modifying records in onConsume() callback,
> but decided to enable modification of consumer records for symmetry with
> onSend().
>
> 3. We agreed to ensure compatibility when/if we add new methods to
> ProducerInterceptor and ConsumerInterceptor by using default methods with
> an empty implementation. Ok to assume Java 8. (This is Ismael’s method #2).
>
> 4. We decided not to add any callbacks to producer and consumer
> interceptors that will depend on internal implementation as part of this
> KIP. However, it is possible to add them later as part of another KIP if
> there are good use-cases.
>
> *Reasoning.* We did not have concrete use-cases that justified more methods
> at this point. Some of the use-cases were for more fine-grain latency
> collection, which could be done with Kafka Metrics. Another use-case was
> encryption. However, there are several design options for encryption. One
> is to do per-record encryption which would require adding
> ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One
> could argue that in that case encryption could be done by adding a custom
> serializer/deserializer. Another option is to do encryption after message
> gets compressed, but there are issues that arise regarding broker doing
> re-compression. We decided that it is better to have that discussion in a
> separate KIP and decide that this is something we want to do with
> interceptors or by other means.
>
>
> Todd, Mayuresh and others who missed the KIP meeting, please let me know
> your thoughts on the scope we agreed on during the meeting.
>
> I will update the KIP proposal with the current decision by end of today.
>
> Thanks,
> Anna
>
>
> On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Hi,
> >
> > I won't be able to make it to KIP hangout due to conflict.
> >
> > Anna, here is the use case where knowing if there are messages in the
> > RecordAccumulator left to be sent to the kafka cluster for a topic is
> > useful.
> >
> > 1) Consider a pipeline :
> > A ---> Mirror-maker -> B
> >
> > 2) We have a topic T in cluster A mirrored to cluster B.
> >
> > 3) Now if we delete topic T in A and immediately proceed to delete the
> > topic in cluster B, some of the the Mirror-maker machines die because
> > atleast one of the batches in RecordAccumulator for topic T fail to be
> > produced to cluster B. We have seen this happening in our clusters.
> >
> >
> > If we know that there are no more messages left in the RecordAccumulator
> to
> > be produced to cluster B, we can safely delete the topic in cluster B
> > without disturbing the pipeline.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner 
> wrote:
> >
> > > Thanks Ismael and Todd for your feedback!
> > >
> > > I agree about coming up with lean, but useful interfaces that will be
> > easy
> > > to extend later.
> > >
> > > When we discuss the minimal set of producer and consumer interceptor
> API
> > in
> > > today’s KIP meeting (discussion item #2 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-26 Thread Mayuresh Gharat
Hi Anna,

Thanks a lot for summarizing the discussion on this kip.

It LGTM.
This is really nice :
We decided not to add any callbacks to producer and consumer
interceptors that will depend on internal implementation as part of this
KIP.
*However, it is possible to add them later as part of another KIP if there
are good use-cases.*

Do you agree with the use case I explained earlier for knowing the number
of records left in the RecordAccumulator for a particular topic. It might
be orthogonal to this KIP, but will be helpful. What do you think?

Thanks,

Mayuresh


On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino  wrote:

> This looks good. As noted, having one mutable interceptor on each side
> allows for the use cases we can envision right now, and I think that’s
> going to provide a great deal of opportunity for implementing things like
> audit, especially within a multi-tenant environment. Looking forward to
> getting this available in the clients.
>
> Thanks!
>
> -Todd
>
>
> On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner  wrote:
>
> > Hi All,
> >
> > Here is meeting notes from today’s KIP meeting:
> >
> > 1. We agreed to keep the scope of this KIP to be producer and consumer
> > interceptors only. Broker-side interceptor will be added later as a
> > separate KIP. The reasons were already mentioned in this thread, but the
> > summary is:
> >  * Broker interceptor is riskier and requires careful consideration about
> > overheads, whether to intercept leaders vs. leaders/replicas, what to do
> on
> > leader failover and so on.
> >  * Broker interceptors increase monitoring resolution, but not including
> it
> > in this KIP does not reduce usefulness of producer and consumer
> > interceptors that enable end-to-end monitoring
> >
> > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor
> callbacks
> > to minimal set of mutable API that are not dependent on producer and
> > consumer internal implementation.
> >
> > ProducerInterceptor:
> > *ProducerRecord onSend(ProducerRecord record);*
> > *void onAcknowledgement(RecordMetadata metadata, Exception exception);*
> >
> > ConsumerInterceptor:
> > *ConsumerRecords onConsume(ConsumerRecords records);*
> > *void onCommit(Map offsets);*
> >
> > We will allow interceptors to modify ProducerRecord on producer side, and
> > modify ConsumerRecords on consumer side. This will support end-to-end
> > monitoring and auditing and support the ability to add metadata for a
> > message. This will support Todd’s Auditing and Routing use-cases.
> >
> > We did not find any use-case for modifying records in onConsume()
> callback,
> > but decided to enable modification of consumer records for symmetry with
> > onSend().
> >
> > 3. We agreed to ensure compatibility when/if we add new methods to
> > ProducerInterceptor and ConsumerInterceptor by using default methods with
> > an empty implementation. Ok to assume Java 8. (This is Ismael’s method
> #2).
> >
> > 4. We decided not to add any callbacks to producer and consumer
> > interceptors that will depend on internal implementation as part of this
> > KIP. However, it is possible to add them later as part of another KIP if
> > there are good use-cases.
> >
> > *Reasoning.* We did not have concrete use-cases that justified more
> methods
> > at this point. Some of the use-cases were for more fine-grain latency
> > collection, which could be done with Kafka Metrics. Another use-case was
> > encryption. However, there are several design options for encryption. One
> > is to do per-record encryption which would require adding
> > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One
> > could argue that in that case encryption could be done by adding a custom
> > serializer/deserializer. Another option is to do encryption after message
> > gets compressed, but there are issues that arise regarding broker doing
> > re-compression. We decided that it is better to have that discussion in a
> > separate KIP and decide that this is something we want to do with
> > interceptors or by other means.
> >
> >
> > Todd, Mayuresh and others who missed the KIP meeting, please let me know
> > your thoughts on the scope we agreed on during the meeting.
> >
> > I will update the KIP proposal with the current decision by end of today.
> >
> > Thanks,
> > Anna
> >
> >
> > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I won't be able to make it to KIP hangout due to conflict.
> > >
> > > Anna, here is the use case where knowing if there are messages in the
> > > RecordAccumulator left to be sent to the kafka cluster for a topic is
> > > useful.
> > >
> > > 1) Consider a pipeline :
> > > A ---> Mirror-maker -> B
> > >
> > > 2) We have a topic T in cluster A mirrored to cluster B.
> > >
> > > 3) Now if we delete topic T in A and immediately proceed to delete 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Neha Narkhede
Anna,

I'm also in favor of including just the APIs for which we have a clear use
case. If more use cases for finer monitoring show up in the future, we can
always update the interface. Would you please highlight in the KIP the APIs
that you think we have an immediate use for?

Joel,

Broker-side monitoring makes a lot of sense in the long term though I don't
think it is a requirement for end-to-end monitoring. With the producer and
consumer interceptors, you have the ability to get full
publish-to-subscribe end-to-end monitoring. The broker interceptor
certainly improves the resolution of monitoring but it is also a riskier
change. I prefer an incremental approach over a big-bang and recommend
taking baby-steps. Let's first make sure the producer/consumer interceptors
are successful. And then come back and add the broker interceptor
carefully.

Having said that, it would be great to understand your proposal for the
broker interceptor independently. We can either add an interceptor
on-append or on-commit. If people want to use this for monitoring, then
possibly on-commit might be more useful?

Thanks,
Neha

On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps  wrote:

> Hey Joel,
>
> What is the interface you are thinking of? Something like this:
> onAppend(String topic, int partition, Records records, long time)
> ?
>
> One challenge right now is that we are still using the old
> Message/MessageSet classes on the broker which I'm not sure if we'd want to
> support over the long haul but it might be okay just to create the records
> instance for this interface.
>
> -Jay
>
> On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy  wrote:
>
> > I'm definitely in favor of having such hooks in the produce/consume
> > life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> > pretty much how it was:
> >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > i.e., we had something similar to the interceptor proposal for various
> > stages of the producer request. The producer provided call-backs for
> > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> > LinkedIn we in fact did auditing within these call-backs (and not
> > explicitly in the wrapper). Over time and with 0.8 we moved that out to
> the
> > wrapper libraries.
> >
> > On a side-note while audit and other monitoring can be done internally
> in a
> > convenient way I think it should be clarified that having a wrapper is in
> > general not a bad idea and I would even consider it to be a
> best-practice.
> > Even with 0.7 we still had a wrapper library and that API has largely
> > stayed the same and has helped protect against (sometimes backwards
> > incompatible) changes in open source.
> >
> > While we are on this topic I have one comment and Anna, you may have
> > already considered this but I don't see mention of it in the KIP:
> >
> > Add a custom message interceptor/validator on the broker on message
> > arrival.
> >
> > We decompress and do basic validation of messages on arrival. I think
> there
> > is value in supporting custom validation and expand it to support custom
> > on-arrival processing. Here is a specific use-case I have in mind. The
> blog
> > that James referenced describes our auditing infrastructure. In order to
> > audit the Kafka cluster itself we need to run a "console auditor" service
> > that consumes everything and spits out audit events back to the cluster.
> I
> > would prefer not having to run this service because:
> >
> >- Well, it is one more service that we have to run and monitor
> >- Consuming everything takes up bandwidth which can be avoided
> >- The console auditor consumer itself can lag and cause temporary
> audit
> >discrepancies
> >
> > One way we can mitigate this is by having mirror-makers in between
> clusters
> > emit audit events. The problem is that the very last cluster in the
> > pipeline will not have any audit which is why we need to have something
> to
> > audit the cluster.
> >
> > If we had a custom message validator then the audit can be done
> on-arrival
> > and we won't need a console auditor.
> >
> > One potential issue in this approach and any elaborate on-arrival
> > processing for that matter is that you may need to deserialize the
> message
> > as well which can drive up produce request handling times. However I'm
> not
> > terribly concerned about that especially if the audit header can be
> > separated out easily or even deserialized partially as this Avro thread
> > touches on
> >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Nice KIP. Excellent idea.
> > > Was just thinking if we can add onDequed() to the ProducerIterceptor
> > > interface. Since 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Anna Povzner
Thanks everyone for your comments so far!

Since we have a KIP meeting tomorrow, here is the list of items I propose
to discuss in the meeting based on everyone's comments.

*1. Should we add broker-side interceptor to this KIP?*
Pros: Improves resolution of monitoring
Cons: Riskier and bigger change
Current proposal: Do not include broker-side interceptor in KIP-42, but
create a separate broker interceptor KIP later.

*2. Minimal set of ProducerInterceptor and ConsumerInterceptor API*
This is API that are required for end-to-end tracing and monitoring and are
*not* dependent on producer and consumer implementation.

Here is the proposal to start the discussion:

*ProducerInterceptor:*
public void onSend(ProducerRecord record);
public void onAcknowledgement(RecordMetadata metadata, Exception exception);

*ConsumerInterceptor:*
public void onConsume(ConsumerRecords records);
public void onCommit(Map offsets);

These API support possible different requirements for what end-to-end means
-- e.g., from sending to producer to delivering to consumer client, or only
accounting for messages that were successfully produced and/or messages
that we know were handled by an application.

*3. Should we add more callbacks to ProducerInterceptor and
ConsumerInterceptor?*

Additional callbacks will be more dependent on implementation, and
therefore, changing internal producer or consumer implementation later may
cause changes in interceptor interfaces.

List of additional callbacks that were mentioned so far:
*1. ProducerInterceptor.onEnqueued(TopicPartition tp, ProducerRecord
record, SerializedKeyValue serializedKeyValue);*
*2. ProducerInterceptor.onDequeued(TopicPartition tp, long appendTime, int
attempts, bool moreQueued);*
*3. ConsumerInterceptor.onReceive(TopicPartition tp, SerializedKeyValue
serializedKeyValue);*

Use-cases:
1. Per-message, finer-grain, latencies (serialization overhead, time spent
in record accumulator, latency inside the consumer). Most of these
latencies are available as averages/max as Kafka Metrics (or we could add
corresponding Kafka Metrics). One potential advantage is collecting these
metrics per-message, e.g. for calculating latency percentiles rather than
averages/max. However, it is not clear if that much detail is actually
needed by any real use-cases. Needs discussion.
2. Any other use-cases?

Thanks,
Anna

On Mon, Jan 25, 2016 at 9:59 PM, Neha Narkhede  wrote:

> Anna,
>
> I'm also in favor of including just the APIs for which we have a clear use
> case. If more use cases for finer monitoring show up in the future, we can
> always update the interface. Would you please highlight in the KIP the APIs
> that you think we have an immediate use for?
>
> Joel,
>
> Broker-side monitoring makes a lot of sense in the long term though I don't
> think it is a requirement for end-to-end monitoring. With the producer and
> consumer interceptors, you have the ability to get full
> publish-to-subscribe end-to-end monitoring. The broker interceptor
> certainly improves the resolution of monitoring but it is also a riskier
> change. I prefer an incremental approach over a big-bang and recommend
> taking baby-steps. Let's first make sure the producer/consumer interceptors
> are successful. And then come back and add the broker interceptor
> carefully.
>
> Having said that, it would be great to understand your proposal for the
> broker interceptor independently. We can either add an interceptor
> on-append or on-commit. If people want to use this for monitoring, then
> possibly on-commit might be more useful?
>
> Thanks,
> Neha
>
> On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps  wrote:
>
> > Hey Joel,
> >
> > What is the interface you are thinking of? Something like this:
> > onAppend(String topic, int partition, Records records, long time)
> > ?
> >
> > One challenge right now is that we are still using the old
> > Message/MessageSet classes on the broker which I'm not sure if we'd want
> to
> > support over the long haul but it might be okay just to create the
> records
> > instance for this interface.
> >
> > -Jay
> >
> > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy 
> wrote:
> >
> > > I'm definitely in favor of having such hooks in the produce/consume
> > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> > > pretty much how it was:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > i.e., we had something similar to the interceptor proposal for various
> > > stages of the producer request. The producer provided call-backs for
> > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> > > LinkedIn we in fact did auditing within these call-backs (and not
> > > explicitly in the wrapper). Over time and with 0.8 we moved that out to
> > the
> > > wrapper libraries.
> > >
> > > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Todd Palino
Great idea. I’ve been talking about this for 2 years, and I’m glad someone
is finally picking it up. Will take a look at the KIP at some point shortly.

-Todd


On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:

> Hey Becket,
>
> Yeah this is really similar to the callback. The difference is really in
> who sets the behavior. The idea of the interceptor is that it doesn't
> require any code change in apps so you can globally add behavior to your
> Kafka usage without changing app code. Whereas the callback is added by the
> app. The idea is to kind of obviate the need for the wrapper code that e.g.
> LinkedIn maintains to hold this kind of stuff.
>
> -Jay
>
> On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin  wrote:
>
> > This could be a useful feature. And I think there are some use cases to
> > mutate the data like rejected alternative one mentioned.
> >
> > I am wondering if there is functional overlapping between
> > ProducerInterceptor.onAcknowledgement() and the producer callback? I can
> > see that the Callback could be a per record setting while
> > onAcknowledgement() is a producer level setting. Other than that, is
> there
> > any difference between them?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede 
> wrote:
> >
> > > James,
> > >
> > > That is one of the many monitoring use cases for the interceptor
> > interface.
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng  wrote:
> > >
> > > > Anna,
> > > >
> > > > I'm trying to understand a concrete use case. It sounds like producer
> > > > interceptors could be used to implement part of LinkedIn's Kafak
> Audit
> > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale
> > > >
> > > > Part of that is done by a wrapper library around the kafka producer
> > that
> > > > keeps a count of the number of messages produced, and then sends that
> > > count
> > > > to a side-topic. It sounds like the producer interceptors could
> > possibly
> > > be
> > > > used to implement that?
> > > >
> > > > -James
> > > >
> > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner 
> wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > I just created a KIP-42 for adding producer and consumer
> interceptors
> > > for
> > > > > intercepting messages at different points on producer and consumer.
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > >
> > > > > Comments and suggestions are welcome!
> > > > >
> > > > > Thanks,
> > > > > Anna
> > > >
> > > >
> > > > 
> > > >
> > > > This email and any attachments may contain confidential and
> privileged
> > > > material for the sole use of the intended recipient. Any review,
> > copying,
> > > > or distribution of this email (or any attachments) by others is
> > > prohibited.
> > > > If you are not the intended recipient, please contact the sender
> > > > immediately and permanently delete this email and any attachments. No
> > > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > > agreement on behalf of TiVo Inc. by email. Binding agreements with
> TiVo
> > > > Inc. may only be made by a signed written agreement.
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
>



-- 
*—-*
*Todd Palino*
Staff Site Reliability Engineer
Data Infrastructure Streaming



linkedin.com/in/toddpalino


Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Joel Koshy
I'm definitely in favor of having such hooks in the produce/consume
life-cycle. Not sure if people remember this but in Kafka 0.7 this was
pretty much how it was:
https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
i.e., we had something similar to the interceptor proposal for various
stages of the producer request. The producer provided call-backs for
beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
LinkedIn we in fact did auditing within these call-backs (and not
explicitly in the wrapper). Over time and with 0.8 we moved that out to the
wrapper libraries.

On a side-note while audit and other monitoring can be done internally in a
convenient way I think it should be clarified that having a wrapper is in
general not a bad idea and I would even consider it to be a best-practice.
Even with 0.7 we still had a wrapper library and that API has largely
stayed the same and has helped protect against (sometimes backwards
incompatible) changes in open source.

While we are on this topic I have one comment and Anna, you may have
already considered this but I don't see mention of it in the KIP:

Add a custom message interceptor/validator on the broker on message arrival.

We decompress and do basic validation of messages on arrival. I think there
is value in supporting custom validation and expand it to support custom
on-arrival processing. Here is a specific use-case I have in mind. The blog
that James referenced describes our auditing infrastructure. In order to
audit the Kafka cluster itself we need to run a "console auditor" service
that consumes everything and spits out audit events back to the cluster. I
would prefer not having to run this service because:

   - Well, it is one more service that we have to run and monitor
   - Consuming everything takes up bandwidth which can be avoided
   - The console auditor consumer itself can lag and cause temporary audit
   discrepancies

One way we can mitigate this is by having mirror-makers in between clusters
emit audit events. The problem is that the very last cluster in the
pipeline will not have any audit which is why we need to have something to
audit the cluster.

If we had a custom message validator then the audit can be done on-arrival
and we won't need a console auditor.

One potential issue in this approach and any elaborate on-arrival
processing for that matter is that you may need to deserialize the message
as well which can drive up produce request handling times. However I'm not
terribly concerned about that especially if the audit header can be
separated out easily or even deserialized partially as this Avro thread
touches on
http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+

Thanks,

Joel

On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Nice KIP. Excellent idea.
> Was just thinking if we can add onDequed() to the ProducerIterceptor
> interface. Since we have the onEnqueued(), it will help the client or the
> tools to know how much time the message spent in the RecordAccumulator.
> Also an API to check if there are any messages left for a particular topic
> in the RecordAccumulator would help.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino  wrote:
>
> > Great idea. I’ve been talking about this for 2 years, and I’m glad
> someone
> > is finally picking it up. Will take a look at the KIP at some point
> > shortly.
> >
> > -Todd
> >
> >
> > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:
> >
> > > Hey Becket,
> > >
> > > Yeah this is really similar to the callback. The difference is really
> in
> > > who sets the behavior. The idea of the interceptor is that it doesn't
> > > require any code change in apps so you can globally add behavior to
> your
> > > Kafka usage without changing app code. Whereas the callback is added by
> > the
> > > app. The idea is to kind of obviate the need for the wrapper code that
> > e.g.
> > > LinkedIn maintains to hold this kind of stuff.
> > >
> > > -Jay
> > >
> > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin 
> > wrote:
> > >
> > > > This could be a useful feature. And I think there are some use cases
> to
> > > > mutate the data like rejected alternative one mentioned.
> > > >
> > > > I am wondering if there is functional overlapping between
> > > > ProducerInterceptor.onAcknowledgement() and the producer callback? I
> > can
> > > > see that the Callback could be a per record setting while
> > > > onAcknowledgement() is a producer level setting. Other than that, is
> > > there
> > > > any difference between them?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede 
> > > wrote:
> > > >
> > > > > James,
> > > > >
> > > > > That is one of the many monitoring use cases 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Jay Kreps
Hey Becket,

Yeah this is really similar to the callback. The difference is really in
who sets the behavior. The idea of the interceptor is that it doesn't
require any code change in apps so you can globally add behavior to your
Kafka usage without changing app code. Whereas the callback is added by the
app. The idea is to kind of obviate the need for the wrapper code that e.g.
LinkedIn maintains to hold this kind of stuff.

-Jay

On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin  wrote:

> This could be a useful feature. And I think there are some use cases to
> mutate the data like rejected alternative one mentioned.
>
> I am wondering if there is functional overlapping between
> ProducerInterceptor.onAcknowledgement() and the producer callback? I can
> see that the Callback could be a per record setting while
> onAcknowledgement() is a producer level setting. Other than that, is there
> any difference between them?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede  wrote:
>
> > James,
> >
> > That is one of the many monitoring use cases for the interceptor
> interface.
> >
> > Thanks,
> > Neha
> >
> > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng  wrote:
> >
> > > Anna,
> > >
> > > I'm trying to understand a concrete use case. It sounds like producer
> > > interceptors could be used to implement part of LinkedIn's Kafak Audit
> > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale
> > >
> > > Part of that is done by a wrapper library around the kafka producer
> that
> > > keeps a count of the number of messages produced, and then sends that
> > count
> > > to a side-topic. It sounds like the producer interceptors could
> possibly
> > be
> > > used to implement that?
> > >
> > > -James
> > >
> > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner  wrote:
> > > >
> > > > Hi,
> > > >
> > > > I just created a KIP-42 for adding producer and consumer interceptors
> > for
> > > > intercepting messages at different points on producer and consumer.
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > >
> > > > Comments and suggestions are welcome!
> > > >
> > > > Thanks,
> > > > Anna
> > >
> > >
> > > 
> > >
> > > This email and any attachments may contain confidential and privileged
> > > material for the sole use of the intended recipient. Any review,
> copying,
> > > or distribution of this email (or any attachments) by others is
> > prohibited.
> > > If you are not the intended recipient, please contact the sender
> > > immediately and permanently delete this email and any attachments. No
> > > employee or agent of TiVo Inc. is authorized to conclude any binding
> > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> > > Inc. may only be made by a signed written agreement.
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>


Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Mayuresh Gharat
Nice KIP. Excellent idea.
Was just thinking if we can add onDequed() to the ProducerIterceptor
interface. Since we have the onEnqueued(), it will help the client or the
tools to know how much time the message spent in the RecordAccumulator.
Also an API to check if there are any messages left for a particular topic
in the RecordAccumulator would help.

Thanks,

Mayuresh

On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino  wrote:

> Great idea. I’ve been talking about this for 2 years, and I’m glad someone
> is finally picking it up. Will take a look at the KIP at some point
> shortly.
>
> -Todd
>
>
> On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:
>
> > Hey Becket,
> >
> > Yeah this is really similar to the callback. The difference is really in
> > who sets the behavior. The idea of the interceptor is that it doesn't
> > require any code change in apps so you can globally add behavior to your
> > Kafka usage without changing app code. Whereas the callback is added by
> the
> > app. The idea is to kind of obviate the need for the wrapper code that
> e.g.
> > LinkedIn maintains to hold this kind of stuff.
> >
> > -Jay
> >
> > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin 
> wrote:
> >
> > > This could be a useful feature. And I think there are some use cases to
> > > mutate the data like rejected alternative one mentioned.
> > >
> > > I am wondering if there is functional overlapping between
> > > ProducerInterceptor.onAcknowledgement() and the producer callback? I
> can
> > > see that the Callback could be a per record setting while
> > > onAcknowledgement() is a producer level setting. Other than that, is
> > there
> > > any difference between them?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede 
> > wrote:
> > >
> > > > James,
> > > >
> > > > That is one of the many monitoring use cases for the interceptor
> > > interface.
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng 
> wrote:
> > > >
> > > > > Anna,
> > > > >
> > > > > I'm trying to understand a concrete use case. It sounds like
> producer
> > > > > interceptors could be used to implement part of LinkedIn's Kafak
> > Audit
> > > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale
> > > > >
> > > > > Part of that is done by a wrapper library around the kafka producer
> > > that
> > > > > keeps a count of the number of messages produced, and then sends
> that
> > > > count
> > > > > to a side-topic. It sounds like the producer interceptors could
> > > possibly
> > > > be
> > > > > used to implement that?
> > > > >
> > > > > -James
> > > > >
> > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner 
> > wrote:
> > > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I just created a KIP-42 for adding producer and consumer
> > interceptors
> > > > for
> > > > > > intercepting messages at different points on producer and
> consumer.
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > > >
> > > > > > Comments and suggestions are welcome!
> > > > > >
> > > > > > Thanks,
> > > > > > Anna
> > > > >
> > > > >
> > > > > 
> > > > >
> > > > > This email and any attachments may contain confidential and
> > privileged
> > > > > material for the sole use of the intended recipient. Any review,
> > > copying,
> > > > > or distribution of this email (or any attachments) by others is
> > > > prohibited.
> > > > > If you are not the intended recipient, please contact the sender
> > > > > immediately and permanently delete this email and any attachments.
> No
> > > > > employee or agent of TiVo Inc. is authorized to conclude any
> binding
> > > > > agreement on behalf of TiVo Inc. by email. Binding agreements with
> > TiVo
> > > > > Inc. may only be made by a signed written agreement.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Neha
> > > >
> > >
> >
>
>
>
> --
> *—-*
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Anna Povzner
Hi Mayuresh,

I agree that onDequeue() callback in ProducerInterceptor would be useful.
The API of this callback needs discussion. I propose the following API:

public void onDequeued(TopicPartition tp, long appendTime, int attempts,
bool moreQueued);

This callback will be called for every record in RecordBatch from
RecordAccumulator.drain() method. This API will require keeping appendTime
of every record in RecordBatch.Thunk. An alternative way for the
interceptor to calculate time spent in the accumulator is to record append
time itself on onEnqueued() and then record dequeued time in OnDequeued().
However, there is no way to match which call of onDequeued() corresponds to
onEnqueued(), since ProducerRecord object is not available anymore at the
time the batch is dequeued.

Feedback on ProducerInterceptor.onDequeued() API is welcome.

Thanks,
Anna




On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshy  wrote:

> I'm definitely in favor of having such hooks in the produce/consume
> life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> pretty much how it was:
>
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> i.e., we had something similar to the interceptor proposal for various
> stages of the producer request. The producer provided call-backs for
> beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> LinkedIn we in fact did auditing within these call-backs (and not
> explicitly in the wrapper). Over time and with 0.8 we moved that out to the
> wrapper libraries.
>
> On a side-note while audit and other monitoring can be done internally in a
> convenient way I think it should be clarified that having a wrapper is in
> general not a bad idea and I would even consider it to be a best-practice.
> Even with 0.7 we still had a wrapper library and that API has largely
> stayed the same and has helped protect against (sometimes backwards
> incompatible) changes in open source.
>
> While we are on this topic I have one comment and Anna, you may have
> already considered this but I don't see mention of it in the KIP:
>
> Add a custom message interceptor/validator on the broker on message
> arrival.
>
> We decompress and do basic validation of messages on arrival. I think there
> is value in supporting custom validation and expand it to support custom
> on-arrival processing. Here is a specific use-case I have in mind. The blog
> that James referenced describes our auditing infrastructure. In order to
> audit the Kafka cluster itself we need to run a "console auditor" service
> that consumes everything and spits out audit events back to the cluster. I
> would prefer not having to run this service because:
>
>- Well, it is one more service that we have to run and monitor
>- Consuming everything takes up bandwidth which can be avoided
>- The console auditor consumer itself can lag and cause temporary audit
>discrepancies
>
> One way we can mitigate this is by having mirror-makers in between clusters
> emit audit events. The problem is that the very last cluster in the
> pipeline will not have any audit which is why we need to have something to
> audit the cluster.
>
> If we had a custom message validator then the audit can be done on-arrival
> and we won't need a console auditor.
>
> One potential issue in this approach and any elaborate on-arrival
> processing for that matter is that you may need to deserialize the message
> as well which can drive up produce request handling times. However I'm not
> terribly concerned about that especially if the audit header can be
> separated out easily or even deserialized partially as this Avro thread
> touches on
>
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
>
> Thanks,
>
> Joel
>
> On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Nice KIP. Excellent idea.
> > Was just thinking if we can add onDequed() to the ProducerIterceptor
> > interface. Since we have the onEnqueued(), it will help the client or the
> > tools to know how much time the message spent in the RecordAccumulator.
> > Also an API to check if there are any messages left for a particular
> topic
> > in the RecordAccumulator would help.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino  wrote:
> >
> > > Great idea. I’ve been talking about this for 2 years, and I’m glad
> > someone
> > > is finally picking it up. Will take a look at the KIP at some point
> > > shortly.
> > >
> > > -Todd
> > >
> > >
> > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:
> > >
> > > > Hey Becket,
> > > >
> > > > Yeah this is really similar to the callback. The difference is really
> > in
> > > > who sets the behavior. The idea of the interceptor is that it doesn't
> > > > require any code change in apps so you can 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Mayuresh Gharat
Hi anna,

Agreed, its difficult to match which call of onDequeued() corresponds to
onEnqueued(), since ProducerRecord object is not available anymore at the
time the batch is dequeued.

We currently always update the lastAppendTime whenever we append a record
to a batch. We can use that and include it at record level for each record
separately whenever it gets appended in tryAppend().

Also this might be orthogonal, but is there a plan to expose an API to
check if we have messages in RecordAccumulator for a particular topic? I
have a usecase that I can discuss if we plan for this API.

Thanks,

Mayuresh


On Mon, Jan 25, 2016 at 4:49 PM, Anna Povzner  wrote:

> Hi Mayuresh,
>
> I agree that onDequeue() callback in ProducerInterceptor would be useful.
> The API of this callback needs discussion. I propose the following API:
>
> public void onDequeued(TopicPartition tp, long appendTime, int attempts,
> bool moreQueued);
>
> This callback will be called for every record in RecordBatch from
> RecordAccumulator.drain() method. This API will require keeping appendTime
> of every record in RecordBatch.Thunk. An alternative way for the
> interceptor to calculate time spent in the accumulator is to record append
> time itself on onEnqueued() and then record dequeued time in OnDequeued().
> However, there is no way to match which call of onDequeued() corresponds to
> onEnqueued(), since ProducerRecord object is not available anymore at the
> time the batch is dequeued.
>
> Feedback on ProducerInterceptor.onDequeued() API is welcome.
>
> Thanks,
> Anna
>
>
>
>
> On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshy  wrote:
>
> > I'm definitely in favor of having such hooks in the produce/consume
> > life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> > pretty much how it was:
> >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > i.e., we had something similar to the interceptor proposal for various
> > stages of the producer request. The producer provided call-backs for
> > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> > LinkedIn we in fact did auditing within these call-backs (and not
> > explicitly in the wrapper). Over time and with 0.8 we moved that out to
> the
> > wrapper libraries.
> >
> > On a side-note while audit and other monitoring can be done internally
> in a
> > convenient way I think it should be clarified that having a wrapper is in
> > general not a bad idea and I would even consider it to be a
> best-practice.
> > Even with 0.7 we still had a wrapper library and that API has largely
> > stayed the same and has helped protect against (sometimes backwards
> > incompatible) changes in open source.
> >
> > While we are on this topic I have one comment and Anna, you may have
> > already considered this but I don't see mention of it in the KIP:
> >
> > Add a custom message interceptor/validator on the broker on message
> > arrival.
> >
> > We decompress and do basic validation of messages on arrival. I think
> there
> > is value in supporting custom validation and expand it to support custom
> > on-arrival processing. Here is a specific use-case I have in mind. The
> blog
> > that James referenced describes our auditing infrastructure. In order to
> > audit the Kafka cluster itself we need to run a "console auditor" service
> > that consumes everything and spits out audit events back to the cluster.
> I
> > would prefer not having to run this service because:
> >
> >- Well, it is one more service that we have to run and monitor
> >- Consuming everything takes up bandwidth which can be avoided
> >- The console auditor consumer itself can lag and cause temporary
> audit
> >discrepancies
> >
> > One way we can mitigate this is by having mirror-makers in between
> clusters
> > emit audit events. The problem is that the very last cluster in the
> > pipeline will not have any audit which is why we need to have something
> to
> > audit the cluster.
> >
> > If we had a custom message validator then the audit can be done
> on-arrival
> > and we won't need a console auditor.
> >
> > One potential issue in this approach and any elaborate on-arrival
> > processing for that matter is that you may need to deserialize the
> message
> > as well which can drive up produce request handling times. However I'm
> not
> > terribly concerned about that especially if the audit header can be
> > separated out easily or even deserialized partially as this Avro thread
> > touches on
> >
> >
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com> wrote:
> >
> > > Nice KIP. Excellent idea.
> > > Was just thinking if we can add onDequed() to the ProducerIterceptor
> > > interface. Since we have the 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Anna Povzner
Hi Mayuresh,

Could you please describe a use-case for checking if there are messages in
RecordAccumulator for a particular topic? In this KIP, we are proposing
interceptor callbacks that will notify things about the message, no query
API. It is possible to add certain info in callback API, like 'moreQueued'
boolean parameter I proposed in onDequeued() callback.

However, we need to find a good balance between exposing internals of
producer and consumer for finer monitoring vs. exposing too much so that
changing internal implementation will cause changing interceptor
interfaces. Lets discuss this during tomorrow's KIP meeting.

Thanks,
Anna

On Mon, Jan 25, 2016 at 7:26 PM, Mayuresh Gharat  wrote:

> Hi anna,
>
> Agreed, its difficult to match which call of onDequeued() corresponds to
> onEnqueued(), since ProducerRecord object is not available anymore at the
> time the batch is dequeued.
>
> We currently always update the lastAppendTime whenever we append a record
> to a batch. We can use that and include it at record level for each record
> separately whenever it gets appended in tryAppend().
>
> Also this might be orthogonal, but is there a plan to expose an API to
> check if we have messages in RecordAccumulator for a particular topic? I
> have a usecase that I can discuss if we plan for this API.
>
> Thanks,
>
> Mayuresh
>
>
> On Mon, Jan 25, 2016 at 4:49 PM, Anna Povzner  wrote:
>
> > Hi Mayuresh,
> >
> > I agree that onDequeue() callback in ProducerInterceptor would be useful.
> > The API of this callback needs discussion. I propose the following API:
> >
> > public void onDequeued(TopicPartition tp, long appendTime, int attempts,
> > bool moreQueued);
> >
> > This callback will be called for every record in RecordBatch from
> > RecordAccumulator.drain() method. This API will require keeping
> appendTime
> > of every record in RecordBatch.Thunk. An alternative way for the
> > interceptor to calculate time spent in the accumulator is to record
> append
> > time itself on onEnqueued() and then record dequeued time in
> OnDequeued().
> > However, there is no way to match which call of onDequeued() corresponds
> to
> > onEnqueued(), since ProducerRecord object is not available anymore at the
> > time the batch is dequeued.
> >
> > Feedback on ProducerInterceptor.onDequeued() API is welcome.
> >
> > Thanks,
> > Anna
> >
> >
> >
> >
> > On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshy  wrote:
> >
> > > I'm definitely in favor of having such hooks in the produce/consume
> > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> > > pretty much how it was:
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> > > i.e., we had something similar to the interceptor proposal for various
> > > stages of the producer request. The producer provided call-backs for
> > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> > > LinkedIn we in fact did auditing within these call-backs (and not
> > > explicitly in the wrapper). Over time and with 0.8 we moved that out to
> > the
> > > wrapper libraries.
> > >
> > > On a side-note while audit and other monitoring can be done internally
> > in a
> > > convenient way I think it should be clarified that having a wrapper is
> in
> > > general not a bad idea and I would even consider it to be a
> > best-practice.
> > > Even with 0.7 we still had a wrapper library and that API has largely
> > > stayed the same and has helped protect against (sometimes backwards
> > > incompatible) changes in open source.
> > >
> > > While we are on this topic I have one comment and Anna, you may have
> > > already considered this but I don't see mention of it in the KIP:
> > >
> > > Add a custom message interceptor/validator on the broker on message
> > > arrival.
> > >
> > > We decompress and do basic validation of messages on arrival. I think
> > there
> > > is value in supporting custom validation and expand it to support
> custom
> > > on-arrival processing. Here is a specific use-case I have in mind. The
> > blog
> > > that James referenced describes our auditing infrastructure. In order
> to
> > > audit the Kafka cluster itself we need to run a "console auditor"
> service
> > > that consumes everything and spits out audit events back to the
> cluster.
> > I
> > > would prefer not having to run this service because:
> > >
> > >- Well, it is one more service that we have to run and monitor
> > >- Consuming everything takes up bandwidth which can be avoided
> > >- The console auditor consumer itself can lag and cause temporary
> > audit
> > >discrepancies
> > >
> > > One way we can mitigate this is by having mirror-makers in between
> > clusters
> > > emit audit events. The problem is that the very last cluster in the
> > > pipeline will not have any audit which is why we need to have something
> > to

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Jay Kreps
I think there is some tension here between exposing lots of hooks that you
can implement and giving a simple, supportable interface.

Personally I would actually argue for not adding an onDequeued but actually
removing both the onEnqueued() and onReceived().

My argument is that onSend and onAcknowledgment are fundamental aspects of
the producer protocol. This means they are easy to understand, and they
don't depend on our implementation at all. I think this implies we can
commit to the API over the long haul. The same is true for the consumer for
onConsume and onCommit.

For example whether onConsume should take the serialized or deserialized
value is very dependent on whether we choose to do serialization lazily or
not which is still totally a matter of debate.

I think maybe the best way to figure this out would be to try to come up
with a set of concrete use cases that justify each method. And then we can
think if it is worth it to include or not. I am a little wary of arguments
along the lines of "it could be useful" because of course you could say
that about any point in the code path.

For onDequeued, I think we already have the queue time metric in the
producer, right? I think that provides insight into the time spent in the
queue without a need for a plugin, right? (I may have misunderstood the use
case, though). Are there other use cases?

-Jay


On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Nice KIP. Excellent idea.
> Was just thinking if we can add onDequed() to the ProducerIterceptor
> interface. Since we have the onEnqueued(), it will help the client or the
> tools to know how much time the message spent in the RecordAccumulator.
> Also an API to check if there are any messages left for a particular topic
> in the RecordAccumulator would help.
>
> Thanks,
>
> Mayuresh
>
> On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino  wrote:
>
> > Great idea. I’ve been talking about this for 2 years, and I’m glad
> someone
> > is finally picking it up. Will take a look at the KIP at some point
> > shortly.
> >
> > -Todd
> >
> >
> > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:
> >
> > > Hey Becket,
> > >
> > > Yeah this is really similar to the callback. The difference is really
> in
> > > who sets the behavior. The idea of the interceptor is that it doesn't
> > > require any code change in apps so you can globally add behavior to
> your
> > > Kafka usage without changing app code. Whereas the callback is added by
> > the
> > > app. The idea is to kind of obviate the need for the wrapper code that
> > e.g.
> > > LinkedIn maintains to hold this kind of stuff.
> > >
> > > -Jay
> > >
> > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin 
> > wrote:
> > >
> > > > This could be a useful feature. And I think there are some use cases
> to
> > > > mutate the data like rejected alternative one mentioned.
> > > >
> > > > I am wondering if there is functional overlapping between
> > > > ProducerInterceptor.onAcknowledgement() and the producer callback? I
> > can
> > > > see that the Callback could be a per record setting while
> > > > onAcknowledgement() is a producer level setting. Other than that, is
> > > there
> > > > any difference between them?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede 
> > > wrote:
> > > >
> > > > > James,
> > > > >
> > > > > That is one of the many monitoring use cases for the interceptor
> > > > interface.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng 
> > wrote:
> > > > >
> > > > > > Anna,
> > > > > >
> > > > > > I'm trying to understand a concrete use case. It sounds like
> > producer
> > > > > > interceptors could be used to implement part of LinkedIn's Kafak
> > > Audit
> > > > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale
> > > > > >
> > > > > > Part of that is done by a wrapper library around the kafka
> producer
> > > > that
> > > > > > keeps a count of the number of messages produced, and then sends
> > that
> > > > > count
> > > > > > to a side-topic. It sounds like the producer interceptors could
> > > > possibly
> > > > > be
> > > > > > used to implement that?
> > > > > >
> > > > > > -James
> > > > > >
> > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner 
> > > wrote:
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I just created a KIP-42 for adding producer and consumer
> > > interceptors
> > > > > for
> > > > > > > intercepting messages at different points on producer and
> > consumer.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > > > > > >
> > > > > > > Comments and suggestions are welcome!
> > > > > > >
> > > > > > > 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Anna Povzner
Hi Joel,

Yes, we considered interceptors on broker side -- they make a lot of sense
and will add more detail to monitoring. We propose to do it later in a
separate KIP because: 1) broker interceptors are more risky, since brokers
are more sensitive to overheads; 2) Just producer and consumer interceptors
will enable end-to-end monitoring and thus a better reward vs. risk
tradeoff; 3) Once we have producer and consumer interceptors, we gain more
experience and see usability; 4) Once we see usability, we can start
working on broker interceptors as separate KIP.

I added broker interceptors to Rejected Alternatives section (out-of-scope
for this KIP). Let me know your thoughts.

Thanks,
Anna

On Mon, Jan 25, 2016 at 7:37 PM, Jay Kreps  wrote:

> I think there is some tension here between exposing lots of hooks that you
> can implement and giving a simple, supportable interface.
>
> Personally I would actually argue for not adding an onDequeued but actually
> removing both the onEnqueued() and onReceived().
>
> My argument is that onSend and onAcknowledgment are fundamental aspects of
> the producer protocol. This means they are easy to understand, and they
> don't depend on our implementation at all. I think this implies we can
> commit to the API over the long haul. The same is true for the consumer for
> onConsume and onCommit.
>
> For example whether onConsume should take the serialized or deserialized
> value is very dependent on whether we choose to do serialization lazily or
> not which is still totally a matter of debate.
>
> I think maybe the best way to figure this out would be to try to come up
> with a set of concrete use cases that justify each method. And then we can
> think if it is worth it to include or not. I am a little wary of arguments
> along the lines of "it could be useful" because of course you could say
> that about any point in the code path.
>
> For onDequeued, I think we already have the queue time metric in the
> producer, right? I think that provides insight into the time spent in the
> queue without a need for a plugin, right? (I may have misunderstood the use
> case, though). Are there other use cases?
>
> -Jay
>
>
> On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Nice KIP. Excellent idea.
> > Was just thinking if we can add onDequed() to the ProducerIterceptor
> > interface. Since we have the onEnqueued(), it will help the client or the
> > tools to know how much time the message spent in the RecordAccumulator.
> > Also an API to check if there are any messages left for a particular
> topic
> > in the RecordAccumulator would help.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino  wrote:
> >
> > > Great idea. I’ve been talking about this for 2 years, and I’m glad
> > someone
> > > is finally picking it up. Will take a look at the KIP at some point
> > > shortly.
> > >
> > > -Todd
> > >
> > >
> > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:
> > >
> > > > Hey Becket,
> > > >
> > > > Yeah this is really similar to the callback. The difference is really
> > in
> > > > who sets the behavior. The idea of the interceptor is that it doesn't
> > > > require any code change in apps so you can globally add behavior to
> > your
> > > > Kafka usage without changing app code. Whereas the callback is added
> by
> > > the
> > > > app. The idea is to kind of obviate the need for the wrapper code
> that
> > > e.g.
> > > > LinkedIn maintains to hold this kind of stuff.
> > > >
> > > > -Jay
> > > >
> > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin 
> > > wrote:
> > > >
> > > > > This could be a useful feature. And I think there are some use
> cases
> > to
> > > > > mutate the data like rejected alternative one mentioned.
> > > > >
> > > > > I am wondering if there is functional overlapping between
> > > > > ProducerInterceptor.onAcknowledgement() and the producer callback?
> I
> > > can
> > > > > see that the Callback could be a per record setting while
> > > > > onAcknowledgement() is a producer level setting. Other than that,
> is
> > > > there
> > > > > any difference between them?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede 
> > > > wrote:
> > > > >
> > > > > > James,
> > > > > >
> > > > > > That is one of the many monitoring use cases for the interceptor
> > > > > interface.
> > > > > >
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng 
> > > wrote:
> > > > > >
> > > > > > > Anna,
> > > > > > >
> > > > > > > I'm trying to understand a concrete use case. It sounds like
> > > producer
> > > > > > > interceptors could be used to implement part of LinkedIn's
> Kafak
> > > > Audit
> > > > > > > tool?
> 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-25 Thread Jay Kreps
Hey Joel,

What is the interface you are thinking of? Something like this:
onAppend(String topic, int partition, Records records, long time)
?

One challenge right now is that we are still using the old
Message/MessageSet classes on the broker which I'm not sure if we'd want to
support over the long haul but it might be okay just to create the records
instance for this interface.

-Jay

On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy  wrote:

> I'm definitely in favor of having such hooks in the produce/consume
> life-cycle. Not sure if people remember this but in Kafka 0.7 this was
> pretty much how it was:
>
> https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
> i.e., we had something similar to the interceptor proposal for various
> stages of the producer request. The producer provided call-backs for
> beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at
> LinkedIn we in fact did auditing within these call-backs (and not
> explicitly in the wrapper). Over time and with 0.8 we moved that out to the
> wrapper libraries.
>
> On a side-note while audit and other monitoring can be done internally in a
> convenient way I think it should be clarified that having a wrapper is in
> general not a bad idea and I would even consider it to be a best-practice.
> Even with 0.7 we still had a wrapper library and that API has largely
> stayed the same and has helped protect against (sometimes backwards
> incompatible) changes in open source.
>
> While we are on this topic I have one comment and Anna, you may have
> already considered this but I don't see mention of it in the KIP:
>
> Add a custom message interceptor/validator on the broker on message
> arrival.
>
> We decompress and do basic validation of messages on arrival. I think there
> is value in supporting custom validation and expand it to support custom
> on-arrival processing. Here is a specific use-case I have in mind. The blog
> that James referenced describes our auditing infrastructure. In order to
> audit the Kafka cluster itself we need to run a "console auditor" service
> that consumes everything and spits out audit events back to the cluster. I
> would prefer not having to run this service because:
>
>- Well, it is one more service that we have to run and monitor
>- Consuming everything takes up bandwidth which can be avoided
>- The console auditor consumer itself can lag and cause temporary audit
>discrepancies
>
> One way we can mitigate this is by having mirror-makers in between clusters
> emit audit events. The problem is that the very last cluster in the
> pipeline will not have any audit which is why we need to have something to
> audit the cluster.
>
> If we had a custom message validator then the audit can be done on-arrival
> and we won't need a console auditor.
>
> One potential issue in this approach and any elaborate on-arrival
> processing for that matter is that you may need to deserialize the message
> as well which can drive up produce request handling times. However I'm not
> terribly concerned about that especially if the audit header can be
> separated out easily or even deserialized partially as this Avro thread
> touches on
>
> http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+
>
> Thanks,
>
> Joel
>
> On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Nice KIP. Excellent idea.
> > Was just thinking if we can add onDequed() to the ProducerIterceptor
> > interface. Since we have the onEnqueued(), it will help the client or the
> > tools to know how much time the message spent in the RecordAccumulator.
> > Also an API to check if there are any messages left for a particular
> topic
> > in the RecordAccumulator would help.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino  wrote:
> >
> > > Great idea. I’ve been talking about this for 2 years, and I’m glad
> > someone
> > > is finally picking it up. Will take a look at the KIP at some point
> > > shortly.
> > >
> > > -Todd
> > >
> > >
> > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps  wrote:
> > >
> > > > Hey Becket,
> > > >
> > > > Yeah this is really similar to the callback. The difference is really
> > in
> > > > who sets the behavior. The idea of the interceptor is that it doesn't
> > > > require any code change in apps so you can globally add behavior to
> > your
> > > > Kafka usage without changing app code. Whereas the callback is added
> by
> > > the
> > > > app. The idea is to kind of obviate the need for the wrapper code
> that
> > > e.g.
> > > > LinkedIn maintains to hold this kind of stuff.
> > > >
> > > > -Jay
> > > >
> > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin 
> > > wrote:
> > > >
> > > > > This could be a useful feature. And I think there are some use
> 

Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-24 Thread Becket Qin
This could be a useful feature. And I think there are some use cases to
mutate the data like rejected alternative one mentioned.

I am wondering if there is functional overlapping between
ProducerInterceptor.onAcknowledgement() and the producer callback? I can
see that the Callback could be a per record setting while
onAcknowledgement() is a producer level setting. Other than that, is there
any difference between them?

Thanks,

Jiangjie (Becket) Qin

On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede  wrote:

> James,
>
> That is one of the many monitoring use cases for the interceptor interface.
>
> Thanks,
> Neha
>
> On Fri, Jan 22, 2016 at 6:18 PM, James Cheng  wrote:
>
> > Anna,
> >
> > I'm trying to understand a concrete use case. It sounds like producer
> > interceptors could be used to implement part of LinkedIn's Kafak Audit
> > tool? https://engineering.linkedin.com/kafka/running-kafka-scale
> >
> > Part of that is done by a wrapper library around the kafka producer that
> > keeps a count of the number of messages produced, and then sends that
> count
> > to a side-topic. It sounds like the producer interceptors could possibly
> be
> > used to implement that?
> >
> > -James
> >
> > > On Jan 22, 2016, at 4:33 PM, Anna Povzner  wrote:
> > >
> > > Hi,
> > >
> > > I just created a KIP-42 for adding producer and consumer interceptors
> for
> > > intercepting messages at different points on producer and consumer.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> > >
> > > Comments and suggestions are welcome!
> > >
> > > Thanks,
> > > Anna
> >
> >
> > 
> >
> > This email and any attachments may contain confidential and privileged
> > material for the sole use of the intended recipient. Any review, copying,
> > or distribution of this email (or any attachments) by others is
> prohibited.
> > If you are not the intended recipient, please contact the sender
> > immediately and permanently delete this email and any attachments. No
> > employee or agent of TiVo Inc. is authorized to conclude any binding
> > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> > Inc. may only be made by a signed written agreement.
> >
>
>
>
> --
> Thanks,
> Neha
>


[DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-22 Thread Anna Povzner
Hi,

I just created a KIP-42 for adding producer and consumer interceptors for
intercepting messages at different points on producer and consumer.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors

Comments and suggestions are welcome!

Thanks,
Anna


Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-22 Thread James Cheng
Anna,

I'm trying to understand a concrete use case. It sounds like producer 
interceptors could be used to implement part of LinkedIn's Kafak Audit tool? 
https://engineering.linkedin.com/kafka/running-kafka-scale

Part of that is done by a wrapper library around the kafka producer that keeps 
a count of the number of messages produced, and then sends that count to a 
side-topic. It sounds like the producer interceptors could possibly be used to 
implement that?

-James

> On Jan 22, 2016, at 4:33 PM, Anna Povzner  wrote:
>
> Hi,
>
> I just created a KIP-42 for adding producer and consumer interceptors for
> intercepting messages at different points on producer and consumer.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
>
> Comments and suggestions are welcome!
>
> Thanks,
> Anna




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors

2016-01-22 Thread Neha Narkhede
James,

That is one of the many monitoring use cases for the interceptor interface.

Thanks,
Neha

On Fri, Jan 22, 2016 at 6:18 PM, James Cheng  wrote:

> Anna,
>
> I'm trying to understand a concrete use case. It sounds like producer
> interceptors could be used to implement part of LinkedIn's Kafak Audit
> tool? https://engineering.linkedin.com/kafka/running-kafka-scale
>
> Part of that is done by a wrapper library around the kafka producer that
> keeps a count of the number of messages produced, and then sends that count
> to a side-topic. It sounds like the producer interceptors could possibly be
> used to implement that?
>
> -James
>
> > On Jan 22, 2016, at 4:33 PM, Anna Povzner  wrote:
> >
> > Hi,
> >
> > I just created a KIP-42 for adding producer and consumer interceptors for
> > intercepting messages at different points on producer and consumer.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
> >
> > Comments and suggestions are welcome!
> >
> > Thanks,
> > Anna
>
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



-- 
Thanks,
Neha