Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Nice finding on the CRC class. It will be great to switch to that at some point. On exposing the CRC - I think we are overthinking the problem. CRC over the entire record makes sense for the durability check and having multiple CRCs is a bad idea. On Fri, Jan 29, 2016 at 4:47 PM, Jay Krepswrote: > The rationale for the CRC covering the whole record was to check corruption > in the full record contents as corruption there will equally prevent > someone trying to consume the data. I think you could argue either way but > let's definitely not end up with two different CRC calculations, that would > just be indecisive. > > That is a super interesting finding about the CRC class. I think you are > right that it became an intrinsic in java 8 with hw support if available. > We should really switch to that. The CRC calculation is still one of the > biggest bottlenecks in the producer and consumer so that should > significantly speed things up. > > -Jay > > On Fri, Jan 29, 2016 at 4:30 PM, Becket Qin wrote: > > > Hi Anna, > > > > I think there is value if CRC for only user bytes can be used. This will > > help when we have future protocol updates. Otherwise any protocol > migration > > might break auditing if it largely relies on CRC including system bytes. > > > > I did some test to understand the performance overhead of having a > separate > > user bytes CRC, > > > > On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for > > each megabyte it takes 0.47 ms. > > To compare with compression cost, I used the compressor used by producer. > > The test uses GZIP and compress 1MB of data for each batch. > > It takes: > > ~90 seconds to compress 10GB bytes that contains all same byte. > > ~470 seconds to compress 10GB bytes containing random bytes. > > >1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns. > (I > > guess it takes time to update the pattern mapping) > > > > So the overhead of a separate CRC computing is <1% considering the > producer > > do a lot of things other than compression. > > > > I am not sure if this overhead is worth taking or not, because we do > have a > > huge number of messages to send, so any overhead should be avoided if > > possible. > > > > BTW. Another interesting finding is that the Crc32 class used in clients > > which used to be faster than java CRC32 in Java 1.6. It seems no longer > the > > case. What I see is that Java CRC32 class is 2x faster than the Crc32 > class > > we are using now. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Fri, Jan 29, 2016 at 1:46 PM, Anna Povzner wrote: > > > > > Joel, thanks for your feedback. I updated the wiki based on your > comments > > > about the wiki writeup. > > > > > > > > > On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner > > wrote: > > > > > > > Becket, > > > > > > > > In your scenario with one message from producer A and one message > from > > > > producer B, those are two different messages, and they should be > > tracked > > > as > > > > two different messages. So I would argue for using record CRC -- CRC > > that > > > > is actually used by the system + it will not require computing a > > > different > > > > CRC again which will add performance overhead. > > > > > > > > If the broker ever changes the CRC, the scenarios when that happens > > > should > > > > be very well defined. As far as I know, the scenarios when CRC is > > > > overwritten by the broker (including KIP-31/32 changes): > > > > -- topic config is LogAppendTime for timestamp type > > > > -- upgrade/downgrade > > > > -- compression codec change (which could be inferred from config). > > > > > > > > Monitoring/audit just needs to know when CRCs are safe to use, which > is > > > > most often is known from config. In the future, this can be further > > > > addressed by broker interceptors. > > > > > > > > Thanks, > > > > Anna > > > > > > > > > > > > > > > > > > > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin > > > wrote: > > > > > > > >> Neha, > > > >> > > > >> CRC is definitely an important type of metadata of a record. I am > not > > > >> arguing about that. But I think we should distinguish between two > > types > > > of > > > >> checksum here, 1) the checksum of user data. and 2) the checksum > > > including > > > >> system appended bytes. > > > >> > > > >> I completely agree that (1) is good to add. But I am not sure if we > > > should > > > >> expose (2) to user, because this means any underlying protocol > change > > > will > > > >> give a different CRC for exact same message. For example, let's say > > > >> producer A is sending message with timestamp. Producer B is sending > > > >> message > > > >> without timestamp. Even they are given the exact same message, the > CRC > > > >> returned would be different. > > > >> > > > >> Also, Kafka broker will modify the system appended bytes in > different > > > >>
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Becket, Is your concern the presence of CRC in the RecordMetadata or do you want to brainstorm how CRC can be used for auditing? I think we shouldn't try to think about the various ways that people can do monitoring using interceptors and the metadata we provide. The entire point of having pluggable interceptors is so that people can employ their own creative mechanisms to make use of interceptors. I do think that it is worth discussing whether or not CRC makes sense as record metadata to the user. My take is that the CRC is the best size-bound summary of serialized record content available to us which is expensive to recompute if the user were to redo it. I'd argue this summary of a record qualifies as its metadata. After all, we use the record CRC for a very important test of the system durability as it travels through the system. 1. Isn't the TopicPartition + Offset already uniquely identified a message? > It seems better than CRC no matter from summary point of view or auditing > point of view. The offset is a system-assigned value of uniqueness to the message. If you trusted the system that much, you are not looking to monitor it out-of-band :-) > 2. Currently CRC only has 4 bytes. So it will have collision when there are > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3 > trillion messages per day. So there will be at least a couple of hundreds > collision for each CRC code every day, whereas TopicPartition+Offset will > not have any collision. The CRC isn't sent over the wire and doesn't add any extra overhead in processing, so what is your concern? If you aren't convinced about its usefulness, you can always use the default do-nothing interceptor at LinkedIn and ignore the CRC. Without having > the entire message bytes, they may not be able to verify its correctness, > and the CRC could even be invalid if the broker ever overwritten any field > or did format conversion. > This doesn't make sense to me. The CRC is used for the most important durability check by Kafka - to verify that the message was not garbled over the wire. The system can't change it; it has to match on the consumer side or we will not return it to the user. On Fri, Jan 29, 2016 at 3:23 AM, Becket Qinwrote: > Anna, > > It is still not clear to me why we should expose CRC to end user. > Followings are my confusions. > > 1. Isn't the TopicPartition + Offset already uniquely identified a message? > It seems better than CRC no matter from summary point of view or auditing > point of view. > > 2. Currently CRC only has 4 bytes. So it will have collision when there are > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3 > trillion messages per day. So there will be at least a couple of hundreds > collision for each CRC code every day, whereas TopicPartition+Offset will > not have any collision. > > 3. CRC is calculated after all the fields have been filled in by producer, > including timestamp, attributes, etc. It might also get recomputed on > broker side. So if users only get CRC from record metadata. Without having > the entire message bytes, they may not be able to verify its correctness, > and the CRC could even be invalid if the broker ever overwritten any field > or did format conversion. > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Thu, Jan 28, 2016 at 5:58 PM, Anna Povzner wrote: > > > On a second thought, yes, I think we should expose record size that > > represents application bytes. This is Becket's option #1. > > > > I updated the KIP wiki with new fields in RecordMetadata and > > ConsumerRecord. > > > > I would like to start a voting thread tomorrow if there are no objections > > or more things to discuss. > > > > Thanks, > > Anna > > > > > > > > On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner wrote: > > > > > Regarding record size as bytes sent over the wire, my concern is that > it > > > is almost impossible to calculate per-record. We could do as: 1) > > compressed > > > bytes / number of records in a compressed message, as Todd mentioned; > or > > 2) > > > or same as #1 but take it proportional to uncompressed record size vs. > > > total uncompressed size of records. All of these calculations give us > an > > > estimate. So maybe record size as bytes sent over the wire is not a > > > per-record metadata, but rather per topic/partition measure that is > > better > > > to be exposed through metrics? > > > > > > > > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino > wrote: > > > > > >> It may be difficult (or nearly impossible) to get actual compressed > > bytes > > >> for a message from a compressed batch, but I do think it’s useful > > >> information to have available for the very reason noted, bandwidth > > >> consumed. Does it make sense to have an interceptor at the batch level > > >> that > > >> can provide this? The other option is to estimate it (such as making > an > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Anna, It is still not clear to me why we should expose CRC to end user. Followings are my confusions. 1. Isn't the TopicPartition + Offset already uniquely identified a message? It seems better than CRC no matter from summary point of view or auditing point of view. 2. Currently CRC only has 4 bytes. So it will have collision when there are more than ~4 billion messages. Take LinkedIn as an example, we have 1.3 trillion messages per day. So there will be at least a couple of hundreds collision for each CRC code every day, whereas TopicPartition+Offset will not have any collision. 3. CRC is calculated after all the fields have been filled in by producer, including timestamp, attributes, etc. It might also get recomputed on broker side. So if users only get CRC from record metadata. Without having the entire message bytes, they may not be able to verify its correctness, and the CRC could even be invalid if the broker ever overwritten any field or did format conversion. Thanks, Jiangjie (Becket) Qin On Thu, Jan 28, 2016 at 5:58 PM, Anna Povznerwrote: > On a second thought, yes, I think we should expose record size that > represents application bytes. This is Becket's option #1. > > I updated the KIP wiki with new fields in RecordMetadata and > ConsumerRecord. > > I would like to start a voting thread tomorrow if there are no objections > or more things to discuss. > > Thanks, > Anna > > > > On Thu, Jan 28, 2016 at 2:25 PM, Anna Povzner wrote: > > > Regarding record size as bytes sent over the wire, my concern is that it > > is almost impossible to calculate per-record. We could do as: 1) > compressed > > bytes / number of records in a compressed message, as Todd mentioned; or > 2) > > or same as #1 but take it proportional to uncompressed record size vs. > > total uncompressed size of records. All of these calculations give us an > > estimate. So maybe record size as bytes sent over the wire is not a > > per-record metadata, but rather per topic/partition measure that is > better > > to be exposed through metrics? > > > > > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino wrote: > > > >> It may be difficult (or nearly impossible) to get actual compressed > bytes > >> for a message from a compressed batch, but I do think it’s useful > >> information to have available for the very reason noted, bandwidth > >> consumed. Does it make sense to have an interceptor at the batch level > >> that > >> can provide this? The other option is to estimate it (such as making an > >> assumption that the messages in a batch are equal in size, which is not > >> necessarily true), which is probably not the right answer. > >> > >> -Todd > >> > >> > >> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner > wrote: > >> > >> > Hi Becket, > >> > > >> > It will be up to the interceptor to implement their audit or > monitoring > >> > strategy. I would also imagine there is more than one good way to do > >> audit. > >> > So, I agree that some of the interceptors may not use CRC, and we will > >> not > >> > require it. The question is now whether intercepting CRCs is needed. I > >> > think they are very useful for monitoring and audit, because CRC > >> provides > >> > an a easy way to get a summary of a message, rather than using message > >> > bytes or key/value objects. > >> > > >> > Regarding record size, I agree that bandwidth example was not a good > >> one. I > >> > think it would be hard to get actual bytes sent over the wire (your > #2), > >> > since multiple records get compressed together and we would need to > >> decide > >> > which bytes to account to which record. So I am inclined to only do > your > >> > #1. However, it still makes more sense to me just to return record > size > >> > including the header, since this is the actual record size. > >> > > >> > Thanks, > >> > Anna > >> > > >> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin > >> wrote: > >> > > >> > > Anna, > >> > > > >> > > Using CRC to do end2end auditing might be very costly because you > will > >> > need > >> > > to collect all the CRC from both producer and consumer. And it is > >> based > >> > on > >> > > the assumption that broker does not modify the record. > >> > > Can you shed some idea on how end to end auditing will be using the > >> CRC > >> > > before we decide to expose such low level detail to the end user? It > >> > would > >> > > also be helpful if you can compare it with something like sequence > >> number > >> > > based auditing. > >> > > > >> > > About the record size, one thing worth notice is that the size of > >> Record > >> > is > >> > > not the actual bytes sent over the wire if we use compression. So > that > >> > does > >> > > not really tell user how much bandwidth they are using. Personally I > >> > think > >> > > two kinds of size may be useful. > >> > > 1. The record size after serialization, i.e. application bytes. (The > >> >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Responding to some of the earlier comments in the thread: @Jay/@Neha, I think any one of onCommit/onAppend/onArrival would work for the concrete use-case that I had outlined. I think onArrival is additionally useful for custom validation - i.e., reject the message and do not append if it violates some cluster-specific rule (for e.g., if some header timestamp is older than xyz). However, the thing with user-supplied validation is we would have to do with a (new) generic error code in the producer response. While there is a risk of a broker interceptor having high latency I think that is acceptable since it is the user's responsibility to ensure low latency - the producer call-back and onAcknowledgment interceptor are similar in this regard although those are less risky. Even so, I think there are clear use-cases for broker interceptors so I feel the risk part is something that just needs to be documented. @Jay that is a good point about moving from Message/MessageSet to Records although that may be less difficult to absorb since it is a broker-side interceptor and so people don't need to get a ton of applications in their company to switch to use it. Re: onEnqueued: monitoring serialization latency can be done via metrics but this is more useful for recording whether serialization succeeded or not. onAcknowledgment subsumes this but it also subsumes other possible errors (such as produce errors). It is more fine-grained than most people need though (i.e., I don't think we will use it even if it is present.) Re: checksums: I think it is a good addition to metadata; and for low-volume or super-critical topics can be used for very strict auditing. There are a couple of typos/edits for the wiki itself: - Under Kafka Producer changes: - you have references to KafkaConsumer constructor and ConsumerConfig.originals. - sendRecord -> sentRecord (may be clearer) - Under ProducerInterceptor interface: there is a mention of onEnqueued which was rejected - Comment for ConsumerRecord.record should probably be: // NEW: record size in bytes (*after decompression*) BTW - Anna, nice work on the KIP! Joel On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhedewrote: > Becket, > > Is your concern the presence of CRC in the RecordMetadata or do you want to > brainstorm how CRC can be used for auditing? I think we shouldn't try to > think about the various ways that people can do monitoring using > interceptors and the metadata we provide. The entire point of having > pluggable interceptors is so that people can employ their own creative > mechanisms to make use of interceptors. > > I do think that it is worth discussing whether or not CRC makes sense as > record metadata to the user. My take is that the CRC is the best size-bound > summary of serialized record content available to us which is expensive to > recompute if the user were to redo it. I'd argue this summary of a record > qualifies as its metadata. After all, we use the record CRC for a very > important test of the system durability as it travels through the system. > > 1. Isn't the TopicPartition + Offset already uniquely identified a message? > > It seems better than CRC no matter from summary point of view or auditing > > point of view. > > > The offset is a system-assigned value of uniqueness to the message. If you > trusted the system that much, you are not looking to monitor it out-of-band > :-) > > > > 2. Currently CRC only has 4 bytes. So it will have collision when there > are > > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3 > > trillion messages per day. So there will be at least a couple of hundreds > > collision for each CRC code every day, whereas TopicPartition+Offset will > > not have any collision. > > > The CRC isn't sent over the wire and doesn't add any extra overhead in > processing, so what is your concern? If you aren't convinced about its > usefulness, you can always use the default do-nothing interceptor at > LinkedIn and ignore the CRC. > > Without having > > the entire message bytes, they may not be able to verify its correctness, > > and the CRC could even be invalid if the broker ever overwritten any > field > > or did format conversion. > > > > This doesn't make sense to me. The CRC is used for the most important > durability check by Kafka - to verify that the message was not garbled over > the wire. The system can't change it; it has to match on the consumer side > or we will not return it to the user. > > On Fri, Jan 29, 2016 at 3:23 AM, Becket Qin wrote: > > > Anna, > > > > It is still not clear to me why we should expose CRC to end user. > > Followings are my confusions. > > > > 1. Isn't the TopicPartition + Offset already uniquely identified a > message? > > It seems better than CRC no matter from summary point of view or auditing > > point of view. > > > > 2. Currently CRC only has 4 bytes. So it will have collision when there >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
The rationale for the CRC covering the whole record was to check corruption in the full record contents as corruption there will equally prevent someone trying to consume the data. I think you could argue either way but let's definitely not end up with two different CRC calculations, that would just be indecisive. That is a super interesting finding about the CRC class. I think you are right that it became an intrinsic in java 8 with hw support if available. We should really switch to that. The CRC calculation is still one of the biggest bottlenecks in the producer and consumer so that should significantly speed things up. -Jay On Fri, Jan 29, 2016 at 4:30 PM, Becket Qinwrote: > Hi Anna, > > I think there is value if CRC for only user bytes can be used. This will > help when we have future protocol updates. Otherwise any protocol migration > might break auditing if it largely relies on CRC including system bytes. > > I did some test to understand the performance overhead of having a separate > user bytes CRC, > > On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for > each megabyte it takes 0.47 ms. > To compare with compression cost, I used the compressor used by producer. > The test uses GZIP and compress 1MB of data for each batch. > It takes: > ~90 seconds to compress 10GB bytes that contains all same byte. > ~470 seconds to compress 10GB bytes containing random bytes. > >1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns. (I > guess it takes time to update the pattern mapping) > > So the overhead of a separate CRC computing is <1% considering the producer > do a lot of things other than compression. > > I am not sure if this overhead is worth taking or not, because we do have a > huge number of messages to send, so any overhead should be avoided if > possible. > > BTW. Another interesting finding is that the Crc32 class used in clients > which used to be faster than java CRC32 in Java 1.6. It seems no longer the > case. What I see is that Java CRC32 class is 2x faster than the Crc32 class > we are using now. > > Thanks, > > Jiangjie (Becket) Qin > > > On Fri, Jan 29, 2016 at 1:46 PM, Anna Povzner wrote: > > > Joel, thanks for your feedback. I updated the wiki based on your comments > > about the wiki writeup. > > > > > > On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner > wrote: > > > > > Becket, > > > > > > In your scenario with one message from producer A and one message from > > > producer B, those are two different messages, and they should be > tracked > > as > > > two different messages. So I would argue for using record CRC -- CRC > that > > > is actually used by the system + it will not require computing a > > different > > > CRC again which will add performance overhead. > > > > > > If the broker ever changes the CRC, the scenarios when that happens > > should > > > be very well defined. As far as I know, the scenarios when CRC is > > > overwritten by the broker (including KIP-31/32 changes): > > > -- topic config is LogAppendTime for timestamp type > > > -- upgrade/downgrade > > > -- compression codec change (which could be inferred from config). > > > > > > Monitoring/audit just needs to know when CRCs are safe to use, which is > > > most often is known from config. In the future, this can be further > > > addressed by broker interceptors. > > > > > > Thanks, > > > Anna > > > > > > > > > > > > > > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin > > wrote: > > > > > >> Neha, > > >> > > >> CRC is definitely an important type of metadata of a record. I am not > > >> arguing about that. But I think we should distinguish between two > types > > of > > >> checksum here, 1) the checksum of user data. and 2) the checksum > > including > > >> system appended bytes. > > >> > > >> I completely agree that (1) is good to add. But I am not sure if we > > should > > >> expose (2) to user, because this means any underlying protocol change > > will > > >> give a different CRC for exact same message. For example, let's say > > >> producer A is sending message with timestamp. Producer B is sending > > >> message > > >> without timestamp. Even they are given the exact same message, the CRC > > >> returned would be different. > > >> > > >> Also, Kafka broker will modify the system appended bytes in different > > >> scenarios, such as compression codec change, message format > > >> conversion(After KIP-31 and KIP-32). > > >> > > >> So my concern is that we are exposing CRC which including system > > appended > > >> bytes to user. > > >> > > >> Other than this I think everything looks good. Nice work, Anna. > > >> > > >> Thanks, > > >> > > >> Jiangjie (Becket) Qin > > >> > > >> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy > > wrote: > > >> > > >> > Responding to some of the earlier comments in the thread: > > >> > > > >> > @Jay/@Neha, > > >> > > > >> > I think any one
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Anna, I think there is value if CRC for only user bytes can be used. This will help when we have future protocol updates. Otherwise any protocol migration might break auditing if it largely relies on CRC including system bytes. I did some test to understand the performance overhead of having a separate user bytes CRC, On my desktop, it took ~4.7 seconds to compute CRC for 10G bytes. So for each megabyte it takes 0.47 ms. To compare with compression cost, I used the compressor used by producer. The test uses GZIP and compress 1MB of data for each batch. It takes: ~90 seconds to compress 10GB bytes that contains all same byte. ~470 seconds to compress 10GB bytes containing random bytes. >1000 seconds to compress 10GB bytes with 4, 8, 16, 32 and 64 patterns. (I guess it takes time to update the pattern mapping) So the overhead of a separate CRC computing is <1% considering the producer do a lot of things other than compression. I am not sure if this overhead is worth taking or not, because we do have a huge number of messages to send, so any overhead should be avoided if possible. BTW. Another interesting finding is that the Crc32 class used in clients which used to be faster than java CRC32 in Java 1.6. It seems no longer the case. What I see is that Java CRC32 class is 2x faster than the Crc32 class we are using now. Thanks, Jiangjie (Becket) Qin On Fri, Jan 29, 2016 at 1:46 PM, Anna Povznerwrote: > Joel, thanks for your feedback. I updated the wiki based on your comments > about the wiki writeup. > > > On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner wrote: > > > Becket, > > > > In your scenario with one message from producer A and one message from > > producer B, those are two different messages, and they should be tracked > as > > two different messages. So I would argue for using record CRC -- CRC that > > is actually used by the system + it will not require computing a > different > > CRC again which will add performance overhead. > > > > If the broker ever changes the CRC, the scenarios when that happens > should > > be very well defined. As far as I know, the scenarios when CRC is > > overwritten by the broker (including KIP-31/32 changes): > > -- topic config is LogAppendTime for timestamp type > > -- upgrade/downgrade > > -- compression codec change (which could be inferred from config). > > > > Monitoring/audit just needs to know when CRCs are safe to use, which is > > most often is known from config. In the future, this can be further > > addressed by broker interceptors. > > > > Thanks, > > Anna > > > > > > > > > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin > wrote: > > > >> Neha, > >> > >> CRC is definitely an important type of metadata of a record. I am not > >> arguing about that. But I think we should distinguish between two types > of > >> checksum here, 1) the checksum of user data. and 2) the checksum > including > >> system appended bytes. > >> > >> I completely agree that (1) is good to add. But I am not sure if we > should > >> expose (2) to user, because this means any underlying protocol change > will > >> give a different CRC for exact same message. For example, let's say > >> producer A is sending message with timestamp. Producer B is sending > >> message > >> without timestamp. Even they are given the exact same message, the CRC > >> returned would be different. > >> > >> Also, Kafka broker will modify the system appended bytes in different > >> scenarios, such as compression codec change, message format > >> conversion(After KIP-31 and KIP-32). > >> > >> So my concern is that we are exposing CRC which including system > appended > >> bytes to user. > >> > >> Other than this I think everything looks good. Nice work, Anna. > >> > >> Thanks, > >> > >> Jiangjie (Becket) Qin > >> > >> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy > wrote: > >> > >> > Responding to some of the earlier comments in the thread: > >> > > >> > @Jay/@Neha, > >> > > >> > I think any one of onCommit/onAppend/onArrival would work for the > >> concrete > >> > use-case that I had outlined. I think onArrival is additionally useful > >> for > >> > custom validation - i.e., reject the message and do not append if it > >> > violates some cluster-specific rule (for e.g., if some header > timestamp > >> is > >> > older than xyz). However, the thing with user-supplied validation is > we > >> > would have to do with a (new) generic error code in the producer > >> response. > >> > While there is a risk of a broker interceptor having high latency I > >> think > >> > that is acceptable since it is the user's responsibility to ensure low > >> > latency - the producer call-back and onAcknowledgment interceptor are > >> > similar in this regard although those are less risky. Even so, I think > >> > there are clear use-cases for broker interceptors so I feel the risk > >> part > >> > is something that just needs to be documented.
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Joel, thanks for your feedback. I updated the wiki based on your comments about the wiki writeup. On Fri, Jan 29, 2016 at 11:50 AM, Anna Povznerwrote: > Becket, > > In your scenario with one message from producer A and one message from > producer B, those are two different messages, and they should be tracked as > two different messages. So I would argue for using record CRC -- CRC that > is actually used by the system + it will not require computing a different > CRC again which will add performance overhead. > > If the broker ever changes the CRC, the scenarios when that happens should > be very well defined. As far as I know, the scenarios when CRC is > overwritten by the broker (including KIP-31/32 changes): > -- topic config is LogAppendTime for timestamp type > -- upgrade/downgrade > -- compression codec change (which could be inferred from config). > > Monitoring/audit just needs to know when CRCs are safe to use, which is > most often is known from config. In the future, this can be further > addressed by broker interceptors. > > Thanks, > Anna > > > > > On Fri, Jan 29, 2016 at 11:30 AM, Becket Qin wrote: > >> Neha, >> >> CRC is definitely an important type of metadata of a record. I am not >> arguing about that. But I think we should distinguish between two types of >> checksum here, 1) the checksum of user data. and 2) the checksum including >> system appended bytes. >> >> I completely agree that (1) is good to add. But I am not sure if we should >> expose (2) to user, because this means any underlying protocol change will >> give a different CRC for exact same message. For example, let's say >> producer A is sending message with timestamp. Producer B is sending >> message >> without timestamp. Even they are given the exact same message, the CRC >> returned would be different. >> >> Also, Kafka broker will modify the system appended bytes in different >> scenarios, such as compression codec change, message format >> conversion(After KIP-31 and KIP-32). >> >> So my concern is that we are exposing CRC which including system appended >> bytes to user. >> >> Other than this I think everything looks good. Nice work, Anna. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy wrote: >> >> > Responding to some of the earlier comments in the thread: >> > >> > @Jay/@Neha, >> > >> > I think any one of onCommit/onAppend/onArrival would work for the >> concrete >> > use-case that I had outlined. I think onArrival is additionally useful >> for >> > custom validation - i.e., reject the message and do not append if it >> > violates some cluster-specific rule (for e.g., if some header timestamp >> is >> > older than xyz). However, the thing with user-supplied validation is we >> > would have to do with a (new) generic error code in the producer >> response. >> > While there is a risk of a broker interceptor having high latency I >> think >> > that is acceptable since it is the user's responsibility to ensure low >> > latency - the producer call-back and onAcknowledgment interceptor are >> > similar in this regard although those are less risky. Even so, I think >> > there are clear use-cases for broker interceptors so I feel the risk >> part >> > is something that just needs to be documented. @Jay that is a good point >> > about moving from Message/MessageSet to Records although that may be >> less >> > difficult to absorb since it is a broker-side interceptor and so people >> > don't need to get a ton of applications in their company to switch to >> use >> > it. >> > >> > Re: onEnqueued: monitoring serialization latency can be done via metrics >> > but this is more useful for recording whether serialization succeeded or >> > not. onAcknowledgment subsumes this but it also subsumes other possible >> > errors (such as produce errors). It is more fine-grained than most >> people >> > need though (i.e., I don't think we will use it even if it is present.) >> > >> > Re: checksums: I think it is a good addition to metadata; and for >> > low-volume or super-critical topics can be used for very strict >> auditing. >> > >> > There are a couple of typos/edits for the wiki itself: >> > >> >- Under Kafka Producer changes: >> >- you have references to KafkaConsumer constructor and >> > ConsumerConfig.originals. >> > - sendRecord -> sentRecord (may be clearer) >> >- Under ProducerInterceptor interface: there is a mention of >> onEnqueued >> >which was rejected >> >- Comment for ConsumerRecord.record should probably be: // NEW: >> record >> >size in bytes (*after decompression*) >> > >> > >> > BTW - Anna, nice work on the KIP! >> > >> > Joel >> > >> > On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede >> wrote: >> > >> > > Becket, >> > > >> > > Is your concern the presence of CRC in the RecordMetadata or do you >> want >> > to >> > > brainstorm how CRC can be used for
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Neha, CRC is definitely an important type of metadata of a record. I am not arguing about that. But I think we should distinguish between two types of checksum here, 1) the checksum of user data. and 2) the checksum including system appended bytes. I completely agree that (1) is good to add. But I am not sure if we should expose (2) to user, because this means any underlying protocol change will give a different CRC for exact same message. For example, let's say producer A is sending message with timestamp. Producer B is sending message without timestamp. Even they are given the exact same message, the CRC returned would be different. Also, Kafka broker will modify the system appended bytes in different scenarios, such as compression codec change, message format conversion(After KIP-31 and KIP-32). So my concern is that we are exposing CRC which including system appended bytes to user. Other than this I think everything looks good. Nice work, Anna. Thanks, Jiangjie (Becket) Qin On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshywrote: > Responding to some of the earlier comments in the thread: > > @Jay/@Neha, > > I think any one of onCommit/onAppend/onArrival would work for the concrete > use-case that I had outlined. I think onArrival is additionally useful for > custom validation - i.e., reject the message and do not append if it > violates some cluster-specific rule (for e.g., if some header timestamp is > older than xyz). However, the thing with user-supplied validation is we > would have to do with a (new) generic error code in the producer response. > While there is a risk of a broker interceptor having high latency I think > that is acceptable since it is the user's responsibility to ensure low > latency - the producer call-back and onAcknowledgment interceptor are > similar in this regard although those are less risky. Even so, I think > there are clear use-cases for broker interceptors so I feel the risk part > is something that just needs to be documented. @Jay that is a good point > about moving from Message/MessageSet to Records although that may be less > difficult to absorb since it is a broker-side interceptor and so people > don't need to get a ton of applications in their company to switch to use > it. > > Re: onEnqueued: monitoring serialization latency can be done via metrics > but this is more useful for recording whether serialization succeeded or > not. onAcknowledgment subsumes this but it also subsumes other possible > errors (such as produce errors). It is more fine-grained than most people > need though (i.e., I don't think we will use it even if it is present.) > > Re: checksums: I think it is a good addition to metadata; and for > low-volume or super-critical topics can be used for very strict auditing. > > There are a couple of typos/edits for the wiki itself: > >- Under Kafka Producer changes: >- you have references to KafkaConsumer constructor and > ConsumerConfig.originals. > - sendRecord -> sentRecord (may be clearer) >- Under ProducerInterceptor interface: there is a mention of onEnqueued >which was rejected >- Comment for ConsumerRecord.record should probably be: // NEW: record >size in bytes (*after decompression*) > > > BTW - Anna, nice work on the KIP! > > Joel > > On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede wrote: > > > Becket, > > > > Is your concern the presence of CRC in the RecordMetadata or do you want > to > > brainstorm how CRC can be used for auditing? I think we shouldn't try to > > think about the various ways that people can do monitoring using > > interceptors and the metadata we provide. The entire point of having > > pluggable interceptors is so that people can employ their own creative > > mechanisms to make use of interceptors. > > > > I do think that it is worth discussing whether or not CRC makes sense as > > record metadata to the user. My take is that the CRC is the best > size-bound > > summary of serialized record content available to us which is expensive > to > > recompute if the user were to redo it. I'd argue this summary of a record > > qualifies as its metadata. After all, we use the record CRC for a very > > important test of the system durability as it travels through the system. > > > > 1. Isn't the TopicPartition + Offset already uniquely identified a > message? > > > It seems better than CRC no matter from summary point of view or > auditing > > > point of view. > > > > > > The offset is a system-assigned value of uniqueness to the message. If > you > > trusted the system that much, you are not looking to monitor it > out-of-band > > :-) > > > > > > > 2. Currently CRC only has 4 bytes. So it will have collision when there > > are > > > more than ~4 billion messages. Take LinkedIn as an example, we have 1.3 > > > trillion messages per day. So there will be at least a couple of > hundreds > > > collision for each CRC code every day, whereas
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Becket, In your scenario with one message from producer A and one message from producer B, those are two different messages, and they should be tracked as two different messages. So I would argue for using record CRC -- CRC that is actually used by the system + it will not require computing a different CRC again which will add performance overhead. If the broker ever changes the CRC, the scenarios when that happens should be very well defined. As far as I know, the scenarios when CRC is overwritten by the broker (including KIP-31/32 changes): -- topic config is LogAppendTime for timestamp type -- upgrade/downgrade -- compression codec change (which could be inferred from config). Monitoring/audit just needs to know when CRCs are safe to use, which is most often is known from config. In the future, this can be further addressed by broker interceptors. Thanks, Anna On Fri, Jan 29, 2016 at 11:30 AM, Becket Qinwrote: > Neha, > > CRC is definitely an important type of metadata of a record. I am not > arguing about that. But I think we should distinguish between two types of > checksum here, 1) the checksum of user data. and 2) the checksum including > system appended bytes. > > I completely agree that (1) is good to add. But I am not sure if we should > expose (2) to user, because this means any underlying protocol change will > give a different CRC for exact same message. For example, let's say > producer A is sending message with timestamp. Producer B is sending message > without timestamp. Even they are given the exact same message, the CRC > returned would be different. > > Also, Kafka broker will modify the system appended bytes in different > scenarios, such as compression codec change, message format > conversion(After KIP-31 and KIP-32). > > So my concern is that we are exposing CRC which including system appended > bytes to user. > > Other than this I think everything looks good. Nice work, Anna. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Jan 29, 2016 at 8:11 AM, Joel Koshy wrote: > > > Responding to some of the earlier comments in the thread: > > > > @Jay/@Neha, > > > > I think any one of onCommit/onAppend/onArrival would work for the > concrete > > use-case that I had outlined. I think onArrival is additionally useful > for > > custom validation - i.e., reject the message and do not append if it > > violates some cluster-specific rule (for e.g., if some header timestamp > is > > older than xyz). However, the thing with user-supplied validation is we > > would have to do with a (new) generic error code in the producer > response. > > While there is a risk of a broker interceptor having high latency I think > > that is acceptable since it is the user's responsibility to ensure low > > latency - the producer call-back and onAcknowledgment interceptor are > > similar in this regard although those are less risky. Even so, I think > > there are clear use-cases for broker interceptors so I feel the risk part > > is something that just needs to be documented. @Jay that is a good point > > about moving from Message/MessageSet to Records although that may be less > > difficult to absorb since it is a broker-side interceptor and so people > > don't need to get a ton of applications in their company to switch to use > > it. > > > > Re: onEnqueued: monitoring serialization latency can be done via metrics > > but this is more useful for recording whether serialization succeeded or > > not. onAcknowledgment subsumes this but it also subsumes other possible > > errors (such as produce errors). It is more fine-grained than most people > > need though (i.e., I don't think we will use it even if it is present.) > > > > Re: checksums: I think it is a good addition to metadata; and for > > low-volume or super-critical topics can be used for very strict auditing. > > > > There are a couple of typos/edits for the wiki itself: > > > >- Under Kafka Producer changes: > >- you have references to KafkaConsumer constructor and > > ConsumerConfig.originals. > > - sendRecord -> sentRecord (may be clearer) > >- Under ProducerInterceptor interface: there is a mention of > onEnqueued > >which was rejected > >- Comment for ConsumerRecord.record should probably be: // NEW: record > >size in bytes (*after decompression*) > > > > > > BTW - Anna, nice work on the KIP! > > > > Joel > > > > On Fri, Jan 29, 2016 at 6:57 AM, Neha Narkhede > wrote: > > > > > Becket, > > > > > > Is your concern the presence of CRC in the RecordMetadata or do you > want > > to > > > brainstorm how CRC can be used for auditing? I think we shouldn't try > to > > > think about the various ways that people can do monitoring using > > > interceptors and the metadata we provide. The entire point of having > > > pluggable interceptors is so that people can employ their own creative > > > mechanisms to make use of interceptors. > > > > > > I do think
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
I am +1 on #1.2 and #3. #2: Regarding CRC, I am not sure if users care about CRC. is there any specific use case? Currently we validate messages by calling ensureValid() to verify the checksum and throw exception if it does not match. Message size would be useful. We can add that to ConsumerRecord. Can you clarify the message size you are referring to? Does it include the message header overhead or not? From user's point of view, they probably don't care about header size. Thanks, Jiangjie (Becket) Qin On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhedewrote: > Anna, > > Thanks for being diligent. > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size > fields to RecordMetadata and ConsumerRecord instead of exposing metadata > piecemeal in the interceptor APIs. > > Thanks, > Neha > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner wrote: > > > Hi All, > > > > The KIP wiki page is now up-to-date with the scope we have agreed on: > > Producer and Consumer Interceptors with a minimal set of mutable API that > > are not dependent on producer and consumer internal implementation. > > > > I have few more API details that I would like to bring attention to > or/and > > discuss: > > > > 1. Handling exceptions > > > > Exceptions can provide an additional level of control. For example, we > can > > filter messages on consumer side or stop messages on producer if they > don’t > > have the right field. > > > > I see two options: > > 1.1. For callbacks that can mutate records (onSend and onConsume), > > propagate exceptions through the original calls (KafkaProducer.send() and > > KafkaConsumer.poll() respectively). For other callbacks, catch exception, > > log, and ignore. > > 1.2. Catch exceptions from all the interceptor callbacks and ignore. > > > > The issue with 1.1. is that it effectively changes KafkaProducer.send() > and > > KafkaConsumer.poll() API, since now they may throw exceptions that are > not > > documented in KafkaProducer/Consumer API. Another option is to allow to > > propagate some exceptions, and ignore others. > > > > I think our use-cases do not require propagating exceptions. So, I > propose > > option 1.2. Unless someone has suggestion/use-cases for propagating > > exceptions. Please let me know. > > > > 2. Intercepting record CRC and record size > > > > Since we decided not to add any intermediate callbacks (such as onEnqueue > > or onReceive) to interceptors, I think it is still valuable to intercept > > record CRC and record size in bytes for monitoring and audit use-cases. > > > > I propose to add checksum and size fields to RecordMetadata and > > ConsumerRecord. Another option would be to add them as parameters in > > onAcknowledgement() and onConsume() callbacks. > > > > 3. Callbacks that allow to modify records look as follows: > > ProducerRecord onSend(ProducerRecord record); > > ConsumerRecords onConsume(ConsumerRecords records); > > > > This means that interceptors can potentially modify topic/partition in > > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose > that > > it is up to the interceptor implementation to ensure that > topic/partition, > > etc is correct. KafkaProducer.send() will use topic, partition, key, and > > value from ProducerRecord returned from the onSend(). Similarly, > > ConsumerRecords returned from KafkaConsumer.poll() would be the ones > > returned from the interceptor. > > > > > > > > Please let me know if you have any suggestions or objections to the > above. > > > > Thanks, > > Anna > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner wrote: > > > > > Hi Mayuresh, > > > > > > I see why you would want to check for messages left in the > > > RecordAccumulator. However, I don't think this will completely solve > the > > > problem. Messages could be in-flight somewhere else, like in the > socket, > > or > > > there maybe in-flight messages on the consumer side of the MirrorMaker. > > So, > > > if we go the route of checking whether there are any in-flight messages > > for > > > topic deletion use-case, maybe it is better count them with onSend() > and > > > onAcknowledge() -- whether all messages sent were acknowledged. I also > > > think that it would be better to solve this without interceptors, such > as > > > fix error handling in this scenario. However, I do not have any good > > > proposal right now, so these are just general thoughts. > > > > > > Thanks, > > > Anna > > > > > > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < > > > gharatmayures...@gmail.com> wrote: > > > > > >> Calling producer.flush(), flushes all the data. So this is OK. But > when > > >> you > > >> are running Mirror maker, I am not sure there is a way to flush() from > > >> outside. > > >> > > >> > > >> Thanks, > > >> > > >> Mayuresh > > >> > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin > > >> wrote: > > >> > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Becket, The use-case for CRC is end-to-end audit, rather than checking whether a single message is corrupt or not. Regarding record size, I was thinking to extract record size from Record. That will include header overhead as well. I think total record size will tell users how much bandwidth their messages take. Since header is relatively small and constant, users also will get an idea of their key/value sizes. Thanks, Anna On Thu, Jan 28, 2016 at 9:29 AM, Becket Qinwrote: > I am +1 on #1.2 and #3. > > #2: Regarding CRC, I am not sure if users care about CRC. is there any > specific use case? Currently we validate messages by calling ensureValid() > to verify the checksum and throw exception if it does not match. > > Message size would be useful. We can add that to ConsumerRecord. Can you > clarify the message size you are referring to? Does it include the message > header overhead or not? From user's point of view, they probably don't care > about header size. > > Thanks, > > Jiangjie (Becket) Qin > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede wrote: > > > Anna, > > > > Thanks for being diligent. > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size > > fields to RecordMetadata and ConsumerRecord instead of exposing metadata > > piecemeal in the interceptor APIs. > > > > Thanks, > > Neha > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner wrote: > > > > > Hi All, > > > > > > The KIP wiki page is now up-to-date with the scope we have agreed on: > > > Producer and Consumer Interceptors with a minimal set of mutable API > that > > > are not dependent on producer and consumer internal implementation. > > > > > > I have few more API details that I would like to bring attention to > > or/and > > > discuss: > > > > > > 1. Handling exceptions > > > > > > Exceptions can provide an additional level of control. For example, we > > can > > > filter messages on consumer side or stop messages on producer if they > > don’t > > > have the right field. > > > > > > I see two options: > > > 1.1. For callbacks that can mutate records (onSend and onConsume), > > > propagate exceptions through the original calls (KafkaProducer.send() > and > > > KafkaConsumer.poll() respectively). For other callbacks, catch > exception, > > > log, and ignore. > > > 1.2. Catch exceptions from all the interceptor callbacks and ignore. > > > > > > The issue with 1.1. is that it effectively changes KafkaProducer.send() > > and > > > KafkaConsumer.poll() API, since now they may throw exceptions that are > > not > > > documented in KafkaProducer/Consumer API. Another option is to allow to > > > propagate some exceptions, and ignore others. > > > > > > I think our use-cases do not require propagating exceptions. So, I > > propose > > > option 1.2. Unless someone has suggestion/use-cases for propagating > > > exceptions. Please let me know. > > > > > > 2. Intercepting record CRC and record size > > > > > > Since we decided not to add any intermediate callbacks (such as > onEnqueue > > > or onReceive) to interceptors, I think it is still valuable to > intercept > > > record CRC and record size in bytes for monitoring and audit use-cases. > > > > > > I propose to add checksum and size fields to RecordMetadata and > > > ConsumerRecord. Another option would be to add them as parameters in > > > onAcknowledgement() and onConsume() callbacks. > > > > > > 3. Callbacks that allow to modify records look as follows: > > > ProducerRecord onSend(ProducerRecord record); > > > ConsumerRecords onConsume(ConsumerRecords records); > > > > > > This means that interceptors can potentially modify topic/partition in > > > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose > > that > > > it is up to the interceptor implementation to ensure that > > topic/partition, > > > etc is correct. KafkaProducer.send() will use topic, partition, key, > and > > > value from ProducerRecord returned from the onSend(). Similarly, > > > ConsumerRecords returned from KafkaConsumer.poll() would be the ones > > > returned from the interceptor. > > > > > > > > > > > > Please let me know if you have any suggestions or objections to the > > above. > > > > > > Thanks, > > > Anna > > > > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner > wrote: > > > > > > > Hi Mayuresh, > > > > > > > > I see why you would want to check for messages left in the > > > > RecordAccumulator. However, I don't think this will completely solve > > the > > > > problem. Messages could be in-flight somewhere else, like in the > > socket, > > > or > > > > there maybe in-flight messages on the consumer side of the > MirrorMaker. > > > So, > > > > if we go the route of checking whether there are any in-flight > messages > > > for > > > > topic deletion use-case, maybe it is better count them with onSend() > > and > > > > onAcknowledge() -- whether all
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Anna, Using CRC to do end2end auditing might be very costly because you will need to collect all the CRC from both producer and consumer. And it is based on the assumption that broker does not modify the record. Can you shed some idea on how end to end auditing will be using the CRC before we decide to expose such low level detail to the end user? It would also be helpful if you can compare it with something like sequence number based auditing. About the record size, one thing worth notice is that the size of Record is not the actual bytes sent over the wire if we use compression. So that does not really tell user how much bandwidth they are using. Personally I think two kinds of size may be useful. 1. The record size after serialization, i.e. application bytes. (The uncompressed record size can be easily derived as well) 2. The actual bytes sent over the wire. We can get (1) easily, but (2) is difficult to get at Record level when we use compression. Thanks, Jiangjie (Becket) Qin On Thu, Jan 28, 2016 at 10:55 AM, Anna Povznerwrote: > Hi Becket, > > The use-case for CRC is end-to-end audit, rather than checking whether a > single message is corrupt or not. > > Regarding record size, I was thinking to extract record size from Record. > That will include header overhead as well. I think total record size will > tell users how much bandwidth their messages take. Since header is > relatively small and constant, users also will get an idea of their > key/value sizes. > > Thanks, > Anna > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin wrote: > > > I am +1 on #1.2 and #3. > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there any > > specific use case? Currently we validate messages by calling > ensureValid() > > to verify the checksum and throw exception if it does not match. > > > > Message size would be useful. We can add that to ConsumerRecord. Can you > > clarify the message size you are referring to? Does it include the > message > > header overhead or not? From user's point of view, they probably don't > care > > about header size. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede > wrote: > > > > > Anna, > > > > > > Thanks for being diligent. > > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and size > > > fields to RecordMetadata and ConsumerRecord instead of exposing > metadata > > > piecemeal in the interceptor APIs. > > > > > > Thanks, > > > Neha > > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner > wrote: > > > > > > > Hi All, > > > > > > > > The KIP wiki page is now up-to-date with the scope we have agreed on: > > > > Producer and Consumer Interceptors with a minimal set of mutable API > > that > > > > are not dependent on producer and consumer internal implementation. > > > > > > > > I have few more API details that I would like to bring attention to > > > or/and > > > > discuss: > > > > > > > > 1. Handling exceptions > > > > > > > > Exceptions can provide an additional level of control. For example, > we > > > can > > > > filter messages on consumer side or stop messages on producer if they > > > don’t > > > > have the right field. > > > > > > > > I see two options: > > > > 1.1. For callbacks that can mutate records (onSend and onConsume), > > > > propagate exceptions through the original calls (KafkaProducer.send() > > and > > > > KafkaConsumer.poll() respectively). For other callbacks, catch > > exception, > > > > log, and ignore. > > > > 1.2. Catch exceptions from all the interceptor callbacks and ignore. > > > > > > > > The issue with 1.1. is that it effectively changes > KafkaProducer.send() > > > and > > > > KafkaConsumer.poll() API, since now they may throw exceptions that > are > > > not > > > > documented in KafkaProducer/Consumer API. Another option is to allow > to > > > > propagate some exceptions, and ignore others. > > > > > > > > I think our use-cases do not require propagating exceptions. So, I > > > propose > > > > option 1.2. Unless someone has suggestion/use-cases for propagating > > > > exceptions. Please let me know. > > > > > > > > 2. Intercepting record CRC and record size > > > > > > > > Since we decided not to add any intermediate callbacks (such as > > onEnqueue > > > > or onReceive) to interceptors, I think it is still valuable to > > intercept > > > > record CRC and record size in bytes for monitoring and audit > use-cases. > > > > > > > > I propose to add checksum and size fields to RecordMetadata and > > > > ConsumerRecord. Another option would be to add them as parameters in > > > > onAcknowledgement() and onConsume() callbacks. > > > > > > > > 3. Callbacks that allow to modify records look as follows: > > > > ProducerRecord onSend(ProducerRecord record); > > > > ConsumerRecords onConsume(ConsumerRecords records); > > > > > > > > This means that
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Regarding record size as bytes sent over the wire, my concern is that it is almost impossible to calculate per-record. We could do as: 1) compressed bytes / number of records in a compressed message, as Todd mentioned; or 2) or same as #1 but take it proportional to uncompressed record size vs. total uncompressed size of records. All of these calculations give us an estimate. So maybe record size as bytes sent over the wire is not a per-record metadata, but rather per topic/partition measure that is better to be exposed through metrics? On Thu, Jan 28, 2016 at 2:09 PM, Todd Palinowrote: > It may be difficult (or nearly impossible) to get actual compressed bytes > for a message from a compressed batch, but I do think it’s useful > information to have available for the very reason noted, bandwidth > consumed. Does it make sense to have an interceptor at the batch level that > can provide this? The other option is to estimate it (such as making an > assumption that the messages in a batch are equal in size, which is not > necessarily true), which is probably not the right answer. > > -Todd > > > On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner wrote: > > > Hi Becket, > > > > It will be up to the interceptor to implement their audit or monitoring > > strategy. I would also imagine there is more than one good way to do > audit. > > So, I agree that some of the interceptors may not use CRC, and we will > not > > require it. The question is now whether intercepting CRCs is needed. I > > think they are very useful for monitoring and audit, because CRC provides > > an a easy way to get a summary of a message, rather than using message > > bytes or key/value objects. > > > > Regarding record size, I agree that bandwidth example was not a good > one. I > > think it would be hard to get actual bytes sent over the wire (your #2), > > since multiple records get compressed together and we would need to > decide > > which bytes to account to which record. So I am inclined to only do your > > #1. However, it still makes more sense to me just to return record size > > including the header, since this is the actual record size. > > > > Thanks, > > Anna > > > > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin > wrote: > > > > > Anna, > > > > > > Using CRC to do end2end auditing might be very costly because you will > > need > > > to collect all the CRC from both producer and consumer. And it is based > > on > > > the assumption that broker does not modify the record. > > > Can you shed some idea on how end to end auditing will be using the CRC > > > before we decide to expose such low level detail to the end user? It > > would > > > also be helpful if you can compare it with something like sequence > number > > > based auditing. > > > > > > About the record size, one thing worth notice is that the size of > Record > > is > > > not the actual bytes sent over the wire if we use compression. So that > > does > > > not really tell user how much bandwidth they are using. Personally I > > think > > > two kinds of size may be useful. > > > 1. The record size after serialization, i.e. application bytes. (The > > > uncompressed record size can be easily derived as well) > > > 2. The actual bytes sent over the wire. > > > We can get (1) easily, but (2) is difficult to get at Record level when > > we > > > use compression. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner > > wrote: > > > > > > > Hi Becket, > > > > > > > > The use-case for CRC is end-to-end audit, rather than checking > whether > > a > > > > single message is corrupt or not. > > > > > > > > Regarding record size, I was thinking to extract record size from > > Record. > > > > That will include header overhead as well. I think total record size > > will > > > > tell users how much bandwidth their messages take. Since header is > > > > relatively small and constant, users also will get an idea of their > > > > key/value sizes. > > > > > > > > Thanks, > > > > Anna > > > > > > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin > > > wrote: > > > > > > > > > I am +1 on #1.2 and #3. > > > > > > > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there > > any > > > > > specific use case? Currently we validate messages by calling > > > > ensureValid() > > > > > to verify the checksum and throw exception if it does not match. > > > > > > > > > > Message size would be useful. We can add that to ConsumerRecord. > Can > > > you > > > > > clarify the message size you are referring to? Does it include the > > > > message > > > > > header overhead or not? From user's point of view, they probably > > don't > > > > care > > > > > about header size. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
It may be difficult (or nearly impossible) to get actual compressed bytes for a message from a compressed batch, but I do think it’s useful information to have available for the very reason noted, bandwidth consumed. Does it make sense to have an interceptor at the batch level that can provide this? The other option is to estimate it (such as making an assumption that the messages in a batch are equal in size, which is not necessarily true), which is probably not the right answer. -Todd On Thu, Jan 28, 2016 at 1:48 PM, Anna Povznerwrote: > Hi Becket, > > It will be up to the interceptor to implement their audit or monitoring > strategy. I would also imagine there is more than one good way to do audit. > So, I agree that some of the interceptors may not use CRC, and we will not > require it. The question is now whether intercepting CRCs is needed. I > think they are very useful for monitoring and audit, because CRC provides > an a easy way to get a summary of a message, rather than using message > bytes or key/value objects. > > Regarding record size, I agree that bandwidth example was not a good one. I > think it would be hard to get actual bytes sent over the wire (your #2), > since multiple records get compressed together and we would need to decide > which bytes to account to which record. So I am inclined to only do your > #1. However, it still makes more sense to me just to return record size > including the header, since this is the actual record size. > > Thanks, > Anna > > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin wrote: > > > Anna, > > > > Using CRC to do end2end auditing might be very costly because you will > need > > to collect all the CRC from both producer and consumer. And it is based > on > > the assumption that broker does not modify the record. > > Can you shed some idea on how end to end auditing will be using the CRC > > before we decide to expose such low level detail to the end user? It > would > > also be helpful if you can compare it with something like sequence number > > based auditing. > > > > About the record size, one thing worth notice is that the size of Record > is > > not the actual bytes sent over the wire if we use compression. So that > does > > not really tell user how much bandwidth they are using. Personally I > think > > two kinds of size may be useful. > > 1. The record size after serialization, i.e. application bytes. (The > > uncompressed record size can be easily derived as well) > > 2. The actual bytes sent over the wire. > > We can get (1) easily, but (2) is difficult to get at Record level when > we > > use compression. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner > wrote: > > > > > Hi Becket, > > > > > > The use-case for CRC is end-to-end audit, rather than checking whether > a > > > single message is corrupt or not. > > > > > > Regarding record size, I was thinking to extract record size from > Record. > > > That will include header overhead as well. I think total record size > will > > > tell users how much bandwidth their messages take. Since header is > > > relatively small and constant, users also will get an idea of their > > > key/value sizes. > > > > > > Thanks, > > > Anna > > > > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin > > wrote: > > > > > > > I am +1 on #1.2 and #3. > > > > > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there > any > > > > specific use case? Currently we validate messages by calling > > > ensureValid() > > > > to verify the checksum and throw exception if it does not match. > > > > > > > > Message size would be useful. We can add that to ConsumerRecord. Can > > you > > > > clarify the message size you are referring to? Does it include the > > > message > > > > header overhead or not? From user's point of view, they probably > don't > > > care > > > > about header size. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede > > > wrote: > > > > > > > > > Anna, > > > > > > > > > > Thanks for being diligent. > > > > > > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and > > size > > > > > fields to RecordMetadata and ConsumerRecord instead of exposing > > > metadata > > > > > piecemeal in the interceptor APIs. > > > > > > > > > > Thanks, > > > > > Neha > > > > > > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner > > > wrote: > > > > > > > > > > > Hi All, > > > > > > > > > > > > The KIP wiki page is now up-to-date with the scope we have agreed > > on: > > > > > > Producer and Consumer Interceptors with a minimal set of mutable > > API > > > > that > > > > > > are not dependent on producer and consumer internal > implementation. > > > > > > > > > > > > I have few more API details that I would like to bring attention
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Becket, It will be up to the interceptor to implement their audit or monitoring strategy. I would also imagine there is more than one good way to do audit. So, I agree that some of the interceptors may not use CRC, and we will not require it. The question is now whether intercepting CRCs is needed. I think they are very useful for monitoring and audit, because CRC provides an a easy way to get a summary of a message, rather than using message bytes or key/value objects. Regarding record size, I agree that bandwidth example was not a good one. I think it would be hard to get actual bytes sent over the wire (your #2), since multiple records get compressed together and we would need to decide which bytes to account to which record. So I am inclined to only do your #1. However, it still makes more sense to me just to return record size including the header, since this is the actual record size. Thanks, Anna On Thu, Jan 28, 2016 at 11:46 AM, Becket Qinwrote: > Anna, > > Using CRC to do end2end auditing might be very costly because you will need > to collect all the CRC from both producer and consumer. And it is based on > the assumption that broker does not modify the record. > Can you shed some idea on how end to end auditing will be using the CRC > before we decide to expose such low level detail to the end user? It would > also be helpful if you can compare it with something like sequence number > based auditing. > > About the record size, one thing worth notice is that the size of Record is > not the actual bytes sent over the wire if we use compression. So that does > not really tell user how much bandwidth they are using. Personally I think > two kinds of size may be useful. > 1. The record size after serialization, i.e. application bytes. (The > uncompressed record size can be easily derived as well) > 2. The actual bytes sent over the wire. > We can get (1) easily, but (2) is difficult to get at Record level when we > use compression. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner wrote: > > > Hi Becket, > > > > The use-case for CRC is end-to-end audit, rather than checking whether a > > single message is corrupt or not. > > > > Regarding record size, I was thinking to extract record size from Record. > > That will include header overhead as well. I think total record size will > > tell users how much bandwidth their messages take. Since header is > > relatively small and constant, users also will get an idea of their > > key/value sizes. > > > > Thanks, > > Anna > > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin > wrote: > > > > > I am +1 on #1.2 and #3. > > > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there any > > > specific use case? Currently we validate messages by calling > > ensureValid() > > > to verify the checksum and throw exception if it does not match. > > > > > > Message size would be useful. We can add that to ConsumerRecord. Can > you > > > clarify the message size you are referring to? Does it include the > > message > > > header overhead or not? From user's point of view, they probably don't > > care > > > about header size. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede > > wrote: > > > > > > > Anna, > > > > > > > > Thanks for being diligent. > > > > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and > size > > > > fields to RecordMetadata and ConsumerRecord instead of exposing > > metadata > > > > piecemeal in the interceptor APIs. > > > > > > > > Thanks, > > > > Neha > > > > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner > > wrote: > > > > > > > > > Hi All, > > > > > > > > > > The KIP wiki page is now up-to-date with the scope we have agreed > on: > > > > > Producer and Consumer Interceptors with a minimal set of mutable > API > > > that > > > > > are not dependent on producer and consumer internal implementation. > > > > > > > > > > I have few more API details that I would like to bring attention to > > > > or/and > > > > > discuss: > > > > > > > > > > 1. Handling exceptions > > > > > > > > > > Exceptions can provide an additional level of control. For example, > > we > > > > can > > > > > filter messages on consumer side or stop messages on producer if > they > > > > don’t > > > > > have the right field. > > > > > > > > > > I see two options: > > > > > 1.1. For callbacks that can mutate records (onSend and onConsume), > > > > > propagate exceptions through the original calls > (KafkaProducer.send() > > > and > > > > > KafkaConsumer.poll() respectively). For other callbacks, catch > > > exception, > > > > > log, and ignore. > > > > > 1.2. Catch exceptions from all the interceptor callbacks and > ignore. > > > > > > > > > > The issue with 1.1. is that it effectively changes > > KafkaProducer.send() > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
On a second thought, yes, I think we should expose record size that represents application bytes. This is Becket's option #1. I updated the KIP wiki with new fields in RecordMetadata and ConsumerRecord. I would like to start a voting thread tomorrow if there are no objections or more things to discuss. Thanks, Anna On Thu, Jan 28, 2016 at 2:25 PM, Anna Povznerwrote: > Regarding record size as bytes sent over the wire, my concern is that it > is almost impossible to calculate per-record. We could do as: 1) compressed > bytes / number of records in a compressed message, as Todd mentioned; or 2) > or same as #1 but take it proportional to uncompressed record size vs. > total uncompressed size of records. All of these calculations give us an > estimate. So maybe record size as bytes sent over the wire is not a > per-record metadata, but rather per topic/partition measure that is better > to be exposed through metrics? > > > On Thu, Jan 28, 2016 at 2:09 PM, Todd Palino wrote: > >> It may be difficult (or nearly impossible) to get actual compressed bytes >> for a message from a compressed batch, but I do think it’s useful >> information to have available for the very reason noted, bandwidth >> consumed. Does it make sense to have an interceptor at the batch level >> that >> can provide this? The other option is to estimate it (such as making an >> assumption that the messages in a batch are equal in size, which is not >> necessarily true), which is probably not the right answer. >> >> -Todd >> >> >> On Thu, Jan 28, 2016 at 1:48 PM, Anna Povzner wrote: >> >> > Hi Becket, >> > >> > It will be up to the interceptor to implement their audit or monitoring >> > strategy. I would also imagine there is more than one good way to do >> audit. >> > So, I agree that some of the interceptors may not use CRC, and we will >> not >> > require it. The question is now whether intercepting CRCs is needed. I >> > think they are very useful for monitoring and audit, because CRC >> provides >> > an a easy way to get a summary of a message, rather than using message >> > bytes or key/value objects. >> > >> > Regarding record size, I agree that bandwidth example was not a good >> one. I >> > think it would be hard to get actual bytes sent over the wire (your #2), >> > since multiple records get compressed together and we would need to >> decide >> > which bytes to account to which record. So I am inclined to only do your >> > #1. However, it still makes more sense to me just to return record size >> > including the header, since this is the actual record size. >> > >> > Thanks, >> > Anna >> > >> > On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin >> wrote: >> > >> > > Anna, >> > > >> > > Using CRC to do end2end auditing might be very costly because you will >> > need >> > > to collect all the CRC from both producer and consumer. And it is >> based >> > on >> > > the assumption that broker does not modify the record. >> > > Can you shed some idea on how end to end auditing will be using the >> CRC >> > > before we decide to expose such low level detail to the end user? It >> > would >> > > also be helpful if you can compare it with something like sequence >> number >> > > based auditing. >> > > >> > > About the record size, one thing worth notice is that the size of >> Record >> > is >> > > not the actual bytes sent over the wire if we use compression. So that >> > does >> > > not really tell user how much bandwidth they are using. Personally I >> > think >> > > two kinds of size may be useful. >> > > 1. The record size after serialization, i.e. application bytes. (The >> > > uncompressed record size can be easily derived as well) >> > > 2. The actual bytes sent over the wire. >> > > We can get (1) easily, but (2) is difficult to get at Record level >> when >> > we >> > > use compression. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner >> > wrote: >> > > >> > > > Hi Becket, >> > > > >> > > > The use-case for CRC is end-to-end audit, rather than checking >> whether >> > a >> > > > single message is corrupt or not. >> > > > >> > > > Regarding record size, I was thinking to extract record size from >> > Record. >> > > > That will include header overhead as well. I think total record size >> > will >> > > > tell users how much bandwidth their messages take. Since header is >> > > > relatively small and constant, users also will get an idea of their >> > > > key/value sizes. >> > > > >> > > > Thanks, >> > > > Anna >> > > > >> > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin >> > > wrote: >> > > > >> > > > > I am +1 on #1.2 and #3. >> > > > > >> > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there >> > any >> > > > > specific use case? Currently we validate messages by calling >> > > > ensureValid() >> > > > > to verify
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi All, The KIP wiki page is now up-to-date with the scope we have agreed on: Producer and Consumer Interceptors with a minimal set of mutable API that are not dependent on producer and consumer internal implementation. I have few more API details that I would like to bring attention to or/and discuss: 1. Handling exceptions Exceptions can provide an additional level of control. For example, we can filter messages on consumer side or stop messages on producer if they don’t have the right field. I see two options: 1.1. For callbacks that can mutate records (onSend and onConsume), propagate exceptions through the original calls (KafkaProducer.send() and KafkaConsumer.poll() respectively). For other callbacks, catch exception, log, and ignore. 1.2. Catch exceptions from all the interceptor callbacks and ignore. The issue with 1.1. is that it effectively changes KafkaProducer.send() and KafkaConsumer.poll() API, since now they may throw exceptions that are not documented in KafkaProducer/Consumer API. Another option is to allow to propagate some exceptions, and ignore others. I think our use-cases do not require propagating exceptions. So, I propose option 1.2. Unless someone has suggestion/use-cases for propagating exceptions. Please let me know. 2. Intercepting record CRC and record size Since we decided not to add any intermediate callbacks (such as onEnqueue or onReceive) to interceptors, I think it is still valuable to intercept record CRC and record size in bytes for monitoring and audit use-cases. I propose to add checksum and size fields to RecordMetadata and ConsumerRecord. Another option would be to add them as parameters in onAcknowledgement() and onConsume() callbacks. 3. Callbacks that allow to modify records look as follows: ProducerRecordonSend(ProducerRecord record); ConsumerRecords onConsume(ConsumerRecords records); This means that interceptors can potentially modify topic/partition in ProducerRecord and topic/partition/offset in ConsumerRecord. I propose that it is up to the interceptor implementation to ensure that topic/partition, etc is correct. KafkaProducer.send() will use topic, partition, key, and value from ProducerRecord returned from the onSend(). Similarly, ConsumerRecords returned from KafkaConsumer.poll() would be the ones returned from the interceptor. Please let me know if you have any suggestions or objections to the above. Thanks, Anna On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner wrote: > Hi Mayuresh, > > I see why you would want to check for messages left in the > RecordAccumulator. However, I don't think this will completely solve the > problem. Messages could be in-flight somewhere else, like in the socket, or > there maybe in-flight messages on the consumer side of the MirrorMaker. So, > if we go the route of checking whether there are any in-flight messages for > topic deletion use-case, maybe it is better count them with onSend() and > onAcknowledge() -- whether all messages sent were acknowledged. I also > think that it would be better to solve this without interceptors, such as > fix error handling in this scenario. However, I do not have any good > proposal right now, so these are just general thoughts. > > Thanks, > Anna > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > >> Calling producer.flush(), flushes all the data. So this is OK. But when >> you >> are running Mirror maker, I am not sure there is a way to flush() from >> outside. >> >> >> Thanks, >> >> Mayuresh >> >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin >> wrote: >> >> > Mayuresh, >> > >> > Regarding your use case about mirror maker. Is it good enough as long >> as we >> > know there is no message for the topic in the producer anymore? If that >> is >> > the case, call producer.flush() is sufficient. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < >> > gharatmayures...@gmail.com >> > > wrote: >> > >> > > Hi Anna, >> > > >> > > Thanks a lot for summarizing the discussion on this kip. >> > > >> > > It LGTM. >> > > This is really nice : >> > > We decided not to add any callbacks to producer and consumer >> > > interceptors that will depend on internal implementation as part of >> this >> > > KIP. >> > > *However, it is possible to add them later as part of another KIP if >> > there >> > > are good use-cases.* >> > > >> > > Do you agree with the use case I explained earlier for knowing the >> number >> > > of records left in the RecordAccumulator for a particular topic. It >> might >> > > be orthogonal to this KIP, but will be helpful. What do you think? >> > > >> > > Thanks, >> > > >> > > Mayuresh >> > > >> > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino >> wrote: >> > > >> > > > This looks good. As noted, having one mutable interceptor on each >> side >> > > > allows for the use
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Mayuresh, I see why you would want to check for messages left in the RecordAccumulator. However, I don't think this will completely solve the problem. Messages could be in-flight somewhere else, like in the socket, or there maybe in-flight messages on the consumer side of the MirrorMaker. So, if we go the route of checking whether there are any in-flight messages for topic deletion use-case, maybe it is better count them with onSend() and onAcknowledge() -- whether all messages sent were acknowledged. I also think that it would be better to solve this without interceptors, such as fix error handling in this scenario. However, I do not have any good proposal right now, so these are just general thoughts. Thanks, Anna On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < gharatmayures...@gmail.com> wrote: > Calling producer.flush(), flushes all the data. So this is OK. But when you > are running Mirror maker, I am not sure there is a way to flush() from > outside. > > > Thanks, > > Mayuresh > > On Wed, Jan 27, 2016 at 11:08 AM, Becket Qinwrote: > > > Mayuresh, > > > > Regarding your use case about mirror maker. Is it good enough as long as > we > > know there is no message for the topic in the producer anymore? If that > is > > the case, call producer.flush() is sufficient. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com > > > wrote: > > > > > Hi Anna, > > > > > > Thanks a lot for summarizing the discussion on this kip. > > > > > > It LGTM. > > > This is really nice : > > > We decided not to add any callbacks to producer and consumer > > > interceptors that will depend on internal implementation as part of > this > > > KIP. > > > *However, it is possible to add them later as part of another KIP if > > there > > > are good use-cases.* > > > > > > Do you agree with the use case I explained earlier for knowing the > number > > > of records left in the RecordAccumulator for a particular topic. It > might > > > be orthogonal to this KIP, but will be helpful. What do you think? > > > > > > Thanks, > > > > > > Mayuresh > > > > > > > > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino > wrote: > > > > > > > This looks good. As noted, having one mutable interceptor on each > side > > > > allows for the use cases we can envision right now, and I think > that’s > > > > going to provide a great deal of opportunity for implementing things > > like > > > > audit, especially within a multi-tenant environment. Looking forward > to > > > > getting this available in the clients. > > > > > > > > Thanks! > > > > > > > > -Todd > > > > > > > > > > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner > > wrote: > > > > > > > > > Hi All, > > > > > > > > > > Here is meeting notes from today’s KIP meeting: > > > > > > > > > > 1. We agreed to keep the scope of this KIP to be producer and > > consumer > > > > > interceptors only. Broker-side interceptor will be added later as a > > > > > separate KIP. The reasons were already mentioned in this thread, > but > > > the > > > > > summary is: > > > > > * Broker interceptor is riskier and requires careful consideration > > > about > > > > > overheads, whether to intercept leaders vs. leaders/replicas, what > to > > > do > > > > on > > > > > leader failover and so on. > > > > > * Broker interceptors increase monitoring resolution, but not > > > including > > > > it > > > > > in this KIP does not reduce usefulness of producer and consumer > > > > > interceptors that enable end-to-end monitoring > > > > > > > > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor > > > > callbacks > > > > > to minimal set of mutable API that are not dependent on producer > and > > > > > consumer internal implementation. > > > > > > > > > > ProducerInterceptor: > > > > > *ProducerRecord onSend(ProducerRecord record);* > > > > > *void onAcknowledgement(RecordMetadata metadata, Exception > > exception);* > > > > > > > > > > ConsumerInterceptor: > > > > > *ConsumerRecords onConsume(ConsumerRecords records);* > > > > > *void onCommit(Map offsets);* > > > > > > > > > > We will allow interceptors to modify ProducerRecord on producer > side, > > > and > > > > > modify ConsumerRecords on consumer side. This will support > end-to-end > > > > > monitoring and auditing and support the ability to add metadata > for a > > > > > message. This will support Todd’s Auditing and Routing use-cases. > > > > > > > > > > We did not find any use-case for modifying records in onConsume() > > > > callback, > > > > > but decided to enable modification of consumer records for symmetry > > > with > > > > > onSend(). > > > > > > > > > > 3. We agreed to ensure compatibility when/if we add new methods to > > > > > ProducerInterceptor and ConsumerInterceptor by using default > methods > > > with > > > > > an empty
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Anna, Thanks for being diligent. +1 on #1.2 and sounds good on #3. I recommend adding checksum and size fields to RecordMetadata and ConsumerRecord instead of exposing metadata piecemeal in the interceptor APIs. Thanks, Neha On Wed, Jan 27, 2016 at 4:10 PM, Anna Povznerwrote: > Hi All, > > The KIP wiki page is now up-to-date with the scope we have agreed on: > Producer and Consumer Interceptors with a minimal set of mutable API that > are not dependent on producer and consumer internal implementation. > > I have few more API details that I would like to bring attention to or/and > discuss: > > 1. Handling exceptions > > Exceptions can provide an additional level of control. For example, we can > filter messages on consumer side or stop messages on producer if they don’t > have the right field. > > I see two options: > 1.1. For callbacks that can mutate records (onSend and onConsume), > propagate exceptions through the original calls (KafkaProducer.send() and > KafkaConsumer.poll() respectively). For other callbacks, catch exception, > log, and ignore. > 1.2. Catch exceptions from all the interceptor callbacks and ignore. > > The issue with 1.1. is that it effectively changes KafkaProducer.send() and > KafkaConsumer.poll() API, since now they may throw exceptions that are not > documented in KafkaProducer/Consumer API. Another option is to allow to > propagate some exceptions, and ignore others. > > I think our use-cases do not require propagating exceptions. So, I propose > option 1.2. Unless someone has suggestion/use-cases for propagating > exceptions. Please let me know. > > 2. Intercepting record CRC and record size > > Since we decided not to add any intermediate callbacks (such as onEnqueue > or onReceive) to interceptors, I think it is still valuable to intercept > record CRC and record size in bytes for monitoring and audit use-cases. > > I propose to add checksum and size fields to RecordMetadata and > ConsumerRecord. Another option would be to add them as parameters in > onAcknowledgement() and onConsume() callbacks. > > 3. Callbacks that allow to modify records look as follows: > ProducerRecord onSend(ProducerRecord record); > ConsumerRecords onConsume(ConsumerRecords records); > > This means that interceptors can potentially modify topic/partition in > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose that > it is up to the interceptor implementation to ensure that topic/partition, > etc is correct. KafkaProducer.send() will use topic, partition, key, and > value from ProducerRecord returned from the onSend(). Similarly, > ConsumerRecords returned from KafkaConsumer.poll() would be the ones > returned from the interceptor. > > > > Please let me know if you have any suggestions or objections to the above. > > Thanks, > Anna > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner wrote: > > > Hi Mayuresh, > > > > I see why you would want to check for messages left in the > > RecordAccumulator. However, I don't think this will completely solve the > > problem. Messages could be in-flight somewhere else, like in the socket, > or > > there maybe in-flight messages on the consumer side of the MirrorMaker. > So, > > if we go the route of checking whether there are any in-flight messages > for > > topic deletion use-case, maybe it is better count them with onSend() and > > onAcknowledge() -- whether all messages sent were acknowledged. I also > > think that it would be better to solve this without interceptors, such as > > fix error handling in this scenario. However, I do not have any good > > proposal right now, so these are just general thoughts. > > > > Thanks, > > Anna > > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < > > gharatmayures...@gmail.com> wrote: > > > >> Calling producer.flush(), flushes all the data. So this is OK. But when > >> you > >> are running Mirror maker, I am not sure there is a way to flush() from > >> outside. > >> > >> > >> Thanks, > >> > >> Mayuresh > >> > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin > >> wrote: > >> > >> > Mayuresh, > >> > > >> > Regarding your use case about mirror maker. Is it good enough as long > >> as we > >> > know there is no message for the topic in the producer anymore? If > that > >> is > >> > the case, call producer.flush() is sufficient. > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < > >> > gharatmayures...@gmail.com > >> > > wrote: > >> > > >> > > Hi Anna, > >> > > > >> > > Thanks a lot for summarizing the discussion on this kip. > >> > > > >> > > It LGTM. > >> > > This is really nice : > >> > > We decided not to add any callbacks to producer and consumer > >> > > interceptors that will depend on internal implementation as part of > >> this > >> > > KIP. > >> > > *However, it is possible to add them later as part of another KIP if > >> >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Mayuresh, Regarding your use case about mirror maker. Is it good enough as long as we know there is no message for the topic in the producer anymore? If that is the case, call producer.flush() is sufficient. Thanks, Jiangjie (Becket) Qin On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharatwrote: > Hi Anna, > > Thanks a lot for summarizing the discussion on this kip. > > It LGTM. > This is really nice : > We decided not to add any callbacks to producer and consumer > interceptors that will depend on internal implementation as part of this > KIP. > *However, it is possible to add them later as part of another KIP if there > are good use-cases.* > > Do you agree with the use case I explained earlier for knowing the number > of records left in the RecordAccumulator for a particular topic. It might > be orthogonal to this KIP, but will be helpful. What do you think? > > Thanks, > > Mayuresh > > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino wrote: > > > This looks good. As noted, having one mutable interceptor on each side > > allows for the use cases we can envision right now, and I think that’s > > going to provide a great deal of opportunity for implementing things like > > audit, especially within a multi-tenant environment. Looking forward to > > getting this available in the clients. > > > > Thanks! > > > > -Todd > > > > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner wrote: > > > > > Hi All, > > > > > > Here is meeting notes from today’s KIP meeting: > > > > > > 1. We agreed to keep the scope of this KIP to be producer and consumer > > > interceptors only. Broker-side interceptor will be added later as a > > > separate KIP. The reasons were already mentioned in this thread, but > the > > > summary is: > > > * Broker interceptor is riskier and requires careful consideration > about > > > overheads, whether to intercept leaders vs. leaders/replicas, what to > do > > on > > > leader failover and so on. > > > * Broker interceptors increase monitoring resolution, but not > including > > it > > > in this KIP does not reduce usefulness of producer and consumer > > > interceptors that enable end-to-end monitoring > > > > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor > > callbacks > > > to minimal set of mutable API that are not dependent on producer and > > > consumer internal implementation. > > > > > > ProducerInterceptor: > > > *ProducerRecord onSend(ProducerRecord record);* > > > *void onAcknowledgement(RecordMetadata metadata, Exception exception);* > > > > > > ConsumerInterceptor: > > > *ConsumerRecords onConsume(ConsumerRecords records);* > > > *void onCommit(Map offsets);* > > > > > > We will allow interceptors to modify ProducerRecord on producer side, > and > > > modify ConsumerRecords on consumer side. This will support end-to-end > > > monitoring and auditing and support the ability to add metadata for a > > > message. This will support Todd’s Auditing and Routing use-cases. > > > > > > We did not find any use-case for modifying records in onConsume() > > callback, > > > but decided to enable modification of consumer records for symmetry > with > > > onSend(). > > > > > > 3. We agreed to ensure compatibility when/if we add new methods to > > > ProducerInterceptor and ConsumerInterceptor by using default methods > with > > > an empty implementation. Ok to assume Java 8. (This is Ismael’s method > > #2). > > > > > > 4. We decided not to add any callbacks to producer and consumer > > > interceptors that will depend on internal implementation as part of > this > > > KIP. However, it is possible to add them later as part of another KIP > if > > > there are good use-cases. > > > > > > *Reasoning.* We did not have concrete use-cases that justified more > > methods > > > at this point. Some of the use-cases were for more fine-grain latency > > > collection, which could be done with Kafka Metrics. Another use-case > was > > > encryption. However, there are several design options for encryption. > One > > > is to do per-record encryption which would require adding > > > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). > One > > > could argue that in that case encryption could be done by adding a > custom > > > serializer/deserializer. Another option is to do encryption after > message > > > gets compressed, but there are issues that arise regarding broker doing > > > re-compression. We decided that it is better to have that discussion > in a > > > separate KIP and decide that this is something we want to do with > > > interceptors or by other means. > > > > > > > > > Todd, Mayuresh and others who missed the KIP meeting, please let me > know > > > your thoughts on the scope we agreed on during the meeting. > > > > > > I will update the KIP proposal with the current decision by end of > today. > > > > > > Thanks, > > > Anna > > > > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Calling producer.flush(), flushes all the data. So this is OK. But when you are running Mirror maker, I am not sure there is a way to flush() from outside. Thanks, Mayuresh On Wed, Jan 27, 2016 at 11:08 AM, Becket Qinwrote: > Mayuresh, > > Regarding your use case about mirror maker. Is it good enough as long as we > know there is no message for the topic in the producer anymore? If that is > the case, call producer.flush() is sufficient. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < > gharatmayures...@gmail.com > > wrote: > > > Hi Anna, > > > > Thanks a lot for summarizing the discussion on this kip. > > > > It LGTM. > > This is really nice : > > We decided not to add any callbacks to producer and consumer > > interceptors that will depend on internal implementation as part of this > > KIP. > > *However, it is possible to add them later as part of another KIP if > there > > are good use-cases.* > > > > Do you agree with the use case I explained earlier for knowing the number > > of records left in the RecordAccumulator for a particular topic. It might > > be orthogonal to this KIP, but will be helpful. What do you think? > > > > Thanks, > > > > Mayuresh > > > > > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino wrote: > > > > > This looks good. As noted, having one mutable interceptor on each side > > > allows for the use cases we can envision right now, and I think that’s > > > going to provide a great deal of opportunity for implementing things > like > > > audit, especially within a multi-tenant environment. Looking forward to > > > getting this available in the clients. > > > > > > Thanks! > > > > > > -Todd > > > > > > > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner > wrote: > > > > > > > Hi All, > > > > > > > > Here is meeting notes from today’s KIP meeting: > > > > > > > > 1. We agreed to keep the scope of this KIP to be producer and > consumer > > > > interceptors only. Broker-side interceptor will be added later as a > > > > separate KIP. The reasons were already mentioned in this thread, but > > the > > > > summary is: > > > > * Broker interceptor is riskier and requires careful consideration > > about > > > > overheads, whether to intercept leaders vs. leaders/replicas, what to > > do > > > on > > > > leader failover and so on. > > > > * Broker interceptors increase monitoring resolution, but not > > including > > > it > > > > in this KIP does not reduce usefulness of producer and consumer > > > > interceptors that enable end-to-end monitoring > > > > > > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor > > > callbacks > > > > to minimal set of mutable API that are not dependent on producer and > > > > consumer internal implementation. > > > > > > > > ProducerInterceptor: > > > > *ProducerRecord onSend(ProducerRecord record);* > > > > *void onAcknowledgement(RecordMetadata metadata, Exception > exception);* > > > > > > > > ConsumerInterceptor: > > > > *ConsumerRecords onConsume(ConsumerRecords records);* > > > > *void onCommit(Map offsets);* > > > > > > > > We will allow interceptors to modify ProducerRecord on producer side, > > and > > > > modify ConsumerRecords on consumer side. This will support end-to-end > > > > monitoring and auditing and support the ability to add metadata for a > > > > message. This will support Todd’s Auditing and Routing use-cases. > > > > > > > > We did not find any use-case for modifying records in onConsume() > > > callback, > > > > but decided to enable modification of consumer records for symmetry > > with > > > > onSend(). > > > > > > > > 3. We agreed to ensure compatibility when/if we add new methods to > > > > ProducerInterceptor and ConsumerInterceptor by using default methods > > with > > > > an empty implementation. Ok to assume Java 8. (This is Ismael’s > method > > > #2). > > > > > > > > 4. We decided not to add any callbacks to producer and consumer > > > > interceptors that will depend on internal implementation as part of > > this > > > > KIP. However, it is possible to add them later as part of another KIP > > if > > > > there are good use-cases. > > > > > > > > *Reasoning.* We did not have concrete use-cases that justified more > > > methods > > > > at this point. Some of the use-cases were for more fine-grain latency > > > > collection, which could be done with Kafka Metrics. Another use-case > > was > > > > encryption. However, there are several design options for encryption. > > One > > > > is to do per-record encryption which would require adding > > > > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). > > One > > > > could argue that in that case encryption could be done by adding a > > custom > > > > serializer/deserializer. Another option is to do encryption after > > message > > > > gets compressed, but there are issues that
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Anna and Neha, I think it makes a lot of sense to try and keep the interface lean and to add more methods later when/if there is a need. What is the current thinking with regards to compatibility when/if we add new methods? A few options come to mind: 1. Change the interface to an abstract class with empty implementations for all the methods. This means that the path to adding new methods is clear. 2. Hope we have moved to Java 8 by the time we need to add new methods and use default methods with an empty implementation for any new method (and potentially make existing methods default methods too at that point for consistency) 3. Introduce a new interface that inherits from the existing Interceptor interface when we need to add new methods. Option 1 is the easiest and it also means that interceptor users only need to override the methods that they are interested (more useful if the number of methods grows). The downside is that interceptor implementations cannot inherit from another class (a straightforward workaround is to make the interceptor a forwarder that calls another class). Also, our existing callbacks are interfaces, so seems a bit inconsistent. Option 2 may be the most appealing one as both users and ourselves retain flexibility. The main downside is that it relies on us moving to Java 8, which may be more than a year away potentially (if we support the last 2 Java releases). Thoughts? Ismael On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhedewrote: > Anna, > > I'm also in favor of including just the APIs for which we have a clear use > case. If more use cases for finer monitoring show up in the future, we can > always update the interface. Would you please highlight in the KIP the APIs > that you think we have an immediate use for? > > Joel, > > Broker-side monitoring makes a lot of sense in the long term though I don't > think it is a requirement for end-to-end monitoring. With the producer and > consumer interceptors, you have the ability to get full > publish-to-subscribe end-to-end monitoring. The broker interceptor > certainly improves the resolution of monitoring but it is also a riskier > change. I prefer an incremental approach over a big-bang and recommend > taking baby-steps. Let's first make sure the producer/consumer interceptors > are successful. And then come back and add the broker interceptor > carefully. > > Having said that, it would be great to understand your proposal for the > broker interceptor independently. We can either add an interceptor > on-append or on-commit. If people want to use this for monitoring, then > possibly on-commit might be more useful? > > Thanks, > Neha > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps wrote: > > > Hey Joel, > > > > What is the interface you are thinking of? Something like this: > > onAppend(String topic, int partition, Records records, long time) > > ? > > > > One challenge right now is that we are still using the old > > Message/MessageSet classes on the broker which I'm not sure if we'd want > to > > support over the long haul but it might be okay just to create the > records > > instance for this interface. > > > > -Jay > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy > wrote: > > > > > I'm definitely in favor of having such hooks in the produce/consume > > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > > > pretty much how it was: > > > > > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > > i.e., we had something similar to the interceptor proposal for various > > > stages of the producer request. The producer provided call-backs for > > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > > > LinkedIn we in fact did auditing within these call-backs (and not > > > explicitly in the wrapper). Over time and with 0.8 we moved that out to > > the > > > wrapper libraries. > > > > > > On a side-note while audit and other monitoring can be done internally > > in a > > > convenient way I think it should be clarified that having a wrapper is > in > > > general not a bad idea and I would even consider it to be a > > best-practice. > > > Even with 0.7 we still had a wrapper library and that API has largely > > > stayed the same and has helped protect against (sometimes backwards > > > incompatible) changes in open source. > > > > > > While we are on this topic I have one comment and Anna, you may have > > > already considered this but I don't see mention of it in the KIP: > > > > > > Add a custom message interceptor/validator on the broker on message > > > arrival. > > > > > > We decompress and do basic validation of messages on arrival. I think > > there > > > is value in supporting custom validation and expand it to support > custom > > > on-arrival processing. Here is a specific use-case I have in mind. The > > blog > > > that James referenced
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Finally got a chance to take a look at this. I won’t be able to make the KIP meeting due to a conflict. I’m somewhat disappointed in this proposal. I think that the explicit exclusion of modification of the messages is short-sighted, and not accounting for it now is going to bite us later. Jay, aren’t you the one railing against public interfaces and how difficult they are to work with when you don’t get them right? The “simple” change to one of these interfaces to make it able to return a record is going to be a significant change and is going to require all clients to rewrite their interceptors. If we’re not willing to put the time to think through manipulation now, then this KIP should be shelved until we are. Implementing something halfway is going to be worse than taking a little longer. In addition, I don’t believe that manipulation requires anything more than interceptors to receive the full record, and then to return it. There are 3 use case I can think of right now without any deep discussion that can make use of interceptors with modification: 1. Auditing. The ability to add metadata to a message for auditing is critical. Hostname, service name, timestamps, etc. are all pieces of data that can be used on the other side of the pipeline to categorize messages, determine loss and transport time, and pin down issues. You may say that these things can just be part of the message schema, but anyone who has worked with a multi-user data system (especially those who have been involved with LinkedIn) know how difficult it is to maintain consistent message schemas and to get other people to put in fields for your use. 2. Encryption. This is probably the most obvious case for record manipulation on both sides. The ability to tie in end to end encryption is important for data that requires external compliance (PCI, HIPAA, etc.). 3. Routing. By being able to add a bit of information about the source or destination of a message to the metadata, you can easily construct an intelligent mirror maker that can prevent loops. This has the opportunity to result in significant operational savings, as you can get rid of the need for tiered clusters in order to prevent loops in mirroring messages. All three of these share the feature that they add metadata to messages. With the pushback on having arbitrary metadata as an “envelope” to the message, this is a way to provide it and make it the responsibility of the client, and not the Kafka broker and system itself. -Todd On Tue, Jan 26, 2016 at 2:30 AM, Ismael Jumawrote: > Hi Anna and Neha, > > I think it makes a lot of sense to try and keep the interface lean and to > add more methods later when/if there is a need. What is the current > thinking with regards to compatibility when/if we add new methods? A few > options come to mind: > > 1. Change the interface to an abstract class with empty implementations for > all the methods. This means that the path to adding new methods is clear. > 2. Hope we have moved to Java 8 by the time we need to add new methods and > use default methods with an empty implementation for any new method (and > potentially make existing methods default methods too at that point for > consistency) > 3. Introduce a new interface that inherits from the existing Interceptor > interface when we need to add new methods. > > Option 1 is the easiest and it also means that interceptor users only need > to override the methods that they are interested (more useful if the number > of methods grows). The downside is that interceptor implementations cannot > inherit from another class (a straightforward workaround is to make the > interceptor a forwarder that calls another class). Also, our existing > callbacks are interfaces, so seems a bit inconsistent. > > Option 2 may be the most appealing one as both users and ourselves retain > flexibility. The main downside is that it relies on us moving to Java 8, > which may be more than a year away potentially (if we support the last 2 > Java releases). > > Thoughts? > > Ismael > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede wrote: > > > Anna, > > > > I'm also in favor of including just the APIs for which we have a clear > use > > case. If more use cases for finer monitoring show up in the future, we > can > > always update the interface. Would you please highlight in the KIP the > APIs > > that you think we have an immediate use for? > > > > Joel, > > > > Broker-side monitoring makes a lot of sense in the long term though I > don't > > think it is a requirement for end-to-end monitoring. With the producer > and > > consumer interceptors, you have the ability to get full > > publish-to-subscribe end-to-end monitoring. The broker interceptor > > certainly improves the resolution of monitoring but it is also a riskier > > change. I prefer an incremental approach over a big-bang and recommend > > taking baby-steps. Let's first make sure the producer/consumer >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Thanks Ismael and Todd for your feedback! I agree about coming up with lean, but useful interfaces that will be easy to extend later. When we discuss the minimal set of producer and consumer interceptor API in today’s KIP meeting (discussion item #2 in my previous email), lets compare two options: *1. Minimal set of immutable API for producer and consumer interceptors* ProducerInterceptor: public void onSend(ProducerRecordrecord); public void onAcknowledgement(RecordMetadata metadata, Exception exception); ConsumerInterceptor: public void onConsume(ConsumerRecords records); public void onCommit(Map offsets); Use-cases: — end-to-end monitoring; custom tracing and logging *2. Minimal set of mutable API for producer and consumer interceptors* ProducerInterceptor: ProducerRecord onSend(ProducerRecord record); void onAcknowledgement(RecordMetadata metadata, Exception exception); ConsumerInterceptor: void onConsume(ConsumerRecords records); void onCommit(Map offsets); Additional use-cases to #1: — Ability to add metadata to a message or fill in standard fields for audit and routing. Implications — Partition assignment will be done based on modified key/value instead of original key/value. If key/value transformation is not consistent (same key and value does not mutate to the same, but modified, key/value), then log compaction would not work. However, audit and routing use-cases from Todd will likely do consistent transformation. *Additional callbacks (discussion item #3 in my previous email):* If we want to support encryption, we would want to be able to modify serialized key/value, rather than key and value objects. This will add the following API to producer and consumer interceptors: ProducerInterceptor: SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord record, SerializedKeyValue serializedKeyValue); ConsumerInterceptor: SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue serializedKeyValue); I am leaning towards implementing the minimal set of immutable or mutable interfaces, making sure that we have a compatibility plan that allows us to add more callbacks in the future (per Ismael comment), and add more APIs later. E.g., for encryption use-case, there could be an argument in doing encryption after message compression vs. per-record encryption that could be done using the above additional API. There is also more implications for every API that modifies records: modifying serialized key/value will again impact partition assignment (we will likely do that after partition assignment), which may impact log compaction and mirror maker partitioning. Thanks, Anna On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino wrote: > Finally got a chance to take a look at this. I won’t be able to make the > KIP meeting due to a conflict. > > I’m somewhat disappointed in this proposal. I think that the explicit > exclusion of modification of the messages is short-sighted, and not > accounting for it now is going to bite us later. Jay, aren’t you the one > railing against public interfaces and how difficult they are to work with > when you don’t get them right? The “simple” change to one of these > interfaces to make it able to return a record is going to be a significant > change and is going to require all clients to rewrite their interceptors. > If we’re not willing to put the time to think through manipulation now, > then this KIP should be shelved until we are. Implementing something > halfway is going to be worse than taking a little longer. In addition, I > don’t believe that manipulation requires anything more than interceptors to > receive the full record, and then to return it. > > There are 3 use case I can think of right now without any deep discussion > that can make use of interceptors with modification: > > 1. Auditing. The ability to add metadata to a message for auditing is > critical. Hostname, service name, timestamps, etc. are all pieces of data > that can be used on the other side of the pipeline to categorize messages, > determine loss and transport time, and pin down issues. You may say that > these things can just be part of the message schema, but anyone who has > worked with a multi-user data system (especially those who have been > involved with LinkedIn) know how difficult it is to maintain consistent > message schemas and to get other people to put in fields for your use. > > 2. Encryption. This is probably the most obvious case for record > manipulation on both sides. The ability to tie in end to end encryption is > important for data that requires external compliance (PCI, HIPAA, etc.). > > 3. Routing. By being able to add a bit of information about the source or > destination of a message to the metadata, you can easily construct an > intelligent mirror maker that can prevent loops. This has the opportunity > to result in
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi, I won't be able to make it to KIP hangout due to conflict. Anna, here is the use case where knowing if there are messages in the RecordAccumulator left to be sent to the kafka cluster for a topic is useful. 1) Consider a pipeline : A ---> Mirror-maker -> B 2) We have a topic T in cluster A mirrored to cluster B. 3) Now if we delete topic T in A and immediately proceed to delete the topic in cluster B, some of the the Mirror-maker machines die because atleast one of the batches in RecordAccumulator for topic T fail to be produced to cluster B. We have seen this happening in our clusters. If we know that there are no more messages left in the RecordAccumulator to be produced to cluster B, we can safely delete the topic in cluster B without disturbing the pipeline. Thanks, Mayuresh On Tue, Jan 26, 2016 at 10:31 AM, Anna Povznerwrote: > Thanks Ismael and Todd for your feedback! > > I agree about coming up with lean, but useful interfaces that will be easy > to extend later. > > When we discuss the minimal set of producer and consumer interceptor API in > today’s KIP meeting (discussion item #2 in my previous email), lets compare > two options: > > *1. Minimal set of immutable API for producer and consumer interceptors* > > ProducerInterceptor: > public void onSend(ProducerRecord record); > public void onAcknowledgement(RecordMetadata metadata, Exception > exception); > > ConsumerInterceptor: > public void onConsume(ConsumerRecords records); > public void onCommit(Map offsets); > > Use-cases: > — end-to-end monitoring; custom tracing and logging > > > *2. Minimal set of mutable API for producer and consumer interceptors* > > ProducerInterceptor: > ProducerRecord onSend(ProducerRecord record); > void onAcknowledgement(RecordMetadata metadata, Exception exception); > > ConsumerInterceptor: > void onConsume(ConsumerRecords records); > void onCommit(Map offsets); > > Additional use-cases to #1: > — Ability to add metadata to a message or fill in standard fields for audit > and routing. > > Implications > — Partition assignment will be done based on modified key/value instead of > original key/value. If key/value transformation is not consistent (same key > and value does not mutate to the same, but modified, key/value), then log > compaction would not work. However, audit and routing use-cases from Todd > will likely do consistent transformation. > > > *Additional callbacks (discussion item #3 in my previous email):* > > If we want to support encryption, we would want to be able to modify > serialized key/value, rather than key and value objects. This will add the > following API to producer and consumer interceptors: > > ProducerInterceptor: > SerializedKeyValue onEnqueued(TopicPartition tp, ProducerRecord > record, SerializedKeyValue serializedKeyValue); > > ConsumerInterceptor: > SerializedKeyValue onReceive(TopicPartition tp, SerializedKeyValue > serializedKeyValue); > > > I am leaning towards implementing the minimal set of immutable or mutable > interfaces, making sure that we have a compatibility plan that allows us to > add more callbacks in the future (per Ismael comment), and add more APIs > later. E.g., for encryption use-case, there could be an argument in doing > encryption after message compression vs. per-record encryption that could > be done using the above additional API. There is also more implications for > every API that modifies records: modifying serialized key/value will again > impact partition assignment (we will likely do that after partition > assignment), which may impact log compaction and mirror maker partitioning. > > > Thanks, > Anna > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino wrote: > > > Finally got a chance to take a look at this. I won’t be able to make the > > KIP meeting due to a conflict. > > > > I’m somewhat disappointed in this proposal. I think that the explicit > > exclusion of modification of the messages is short-sighted, and not > > accounting for it now is going to bite us later. Jay, aren’t you the one > > railing against public interfaces and how difficult they are to work with > > when you don’t get them right? The “simple” change to one of these > > interfaces to make it able to return a record is going to be a > significant > > change and is going to require all clients to rewrite their interceptors. > > If we’re not willing to put the time to think through manipulation now, > > then this KIP should be shelved until we are. Implementing something > > halfway is going to be worse than taking a little longer. In addition, I > > don’t believe that manipulation requires anything more than interceptors > to > > receive the full record, and then to return it. > > > > There are 3 use case I can think of right now without any deep discussion > > that can make use of interceptors with modification: > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi All, Here is meeting notes from today’s KIP meeting: 1. We agreed to keep the scope of this KIP to be producer and consumer interceptors only. Broker-side interceptor will be added later as a separate KIP. The reasons were already mentioned in this thread, but the summary is: * Broker interceptor is riskier and requires careful consideration about overheads, whether to intercept leaders vs. leaders/replicas, what to do on leader failover and so on. * Broker interceptors increase monitoring resolution, but not including it in this KIP does not reduce usefulness of producer and consumer interceptors that enable end-to-end monitoring 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor callbacks to minimal set of mutable API that are not dependent on producer and consumer internal implementation. ProducerInterceptor: *ProducerRecordonSend(ProducerRecord record);* *void onAcknowledgement(RecordMetadata metadata, Exception exception);* ConsumerInterceptor: *ConsumerRecords onConsume(ConsumerRecords records);* *void onCommit(Map offsets);* We will allow interceptors to modify ProducerRecord on producer side, and modify ConsumerRecords on consumer side. This will support end-to-end monitoring and auditing and support the ability to add metadata for a message. This will support Todd’s Auditing and Routing use-cases. We did not find any use-case for modifying records in onConsume() callback, but decided to enable modification of consumer records for symmetry with onSend(). 3. We agreed to ensure compatibility when/if we add new methods to ProducerInterceptor and ConsumerInterceptor by using default methods with an empty implementation. Ok to assume Java 8. (This is Ismael’s method #2). 4. We decided not to add any callbacks to producer and consumer interceptors that will depend on internal implementation as part of this KIP. However, it is possible to add them later as part of another KIP if there are good use-cases. *Reasoning.* We did not have concrete use-cases that justified more methods at this point. Some of the use-cases were for more fine-grain latency collection, which could be done with Kafka Metrics. Another use-case was encryption. However, there are several design options for encryption. One is to do per-record encryption which would require adding ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One could argue that in that case encryption could be done by adding a custom serializer/deserializer. Another option is to do encryption after message gets compressed, but there are issues that arise regarding broker doing re-compression. We decided that it is better to have that discussion in a separate KIP and decide that this is something we want to do with interceptors or by other means. Todd, Mayuresh and others who missed the KIP meeting, please let me know your thoughts on the scope we agreed on during the meeting. I will update the KIP proposal with the current decision by end of today. Thanks, Anna On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < gharatmayures...@gmail.com> wrote: > Hi, > > I won't be able to make it to KIP hangout due to conflict. > > Anna, here is the use case where knowing if there are messages in the > RecordAccumulator left to be sent to the kafka cluster for a topic is > useful. > > 1) Consider a pipeline : > A ---> Mirror-maker -> B > > 2) We have a topic T in cluster A mirrored to cluster B. > > 3) Now if we delete topic T in A and immediately proceed to delete the > topic in cluster B, some of the the Mirror-maker machines die because > atleast one of the batches in RecordAccumulator for topic T fail to be > produced to cluster B. We have seen this happening in our clusters. > > > If we know that there are no more messages left in the RecordAccumulator to > be produced to cluster B, we can safely delete the topic in cluster B > without disturbing the pipeline. > > Thanks, > > Mayuresh > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner wrote: > > > Thanks Ismael and Todd for your feedback! > > > > I agree about coming up with lean, but useful interfaces that will be > easy > > to extend later. > > > > When we discuss the minimal set of producer and consumer interceptor API > in > > today’s KIP meeting (discussion item #2 in my previous email), lets > compare > > two options: > > > > *1. Minimal set of immutable API for producer and consumer interceptors* > > > > ProducerInterceptor: > > public void onSend(ProducerRecord record); > > public void onAcknowledgement(RecordMetadata metadata, Exception > > exception); > > > > ConsumerInterceptor: > > public void onConsume(ConsumerRecords records); > > public void onCommit(Map offsets); > > > > Use-cases: > > — end-to-end monitoring; custom tracing and logging > > > > > > *2. Minimal set of mutable API for producer and consumer interceptors* > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
This looks good. As noted, having one mutable interceptor on each side allows for the use cases we can envision right now, and I think that’s going to provide a great deal of opportunity for implementing things like audit, especially within a multi-tenant environment. Looking forward to getting this available in the clients. Thanks! -Todd On Tue, Jan 26, 2016 at 2:36 PM, Anna Povznerwrote: > Hi All, > > Here is meeting notes from today’s KIP meeting: > > 1. We agreed to keep the scope of this KIP to be producer and consumer > interceptors only. Broker-side interceptor will be added later as a > separate KIP. The reasons were already mentioned in this thread, but the > summary is: > * Broker interceptor is riskier and requires careful consideration about > overheads, whether to intercept leaders vs. leaders/replicas, what to do on > leader failover and so on. > * Broker interceptors increase monitoring resolution, but not including it > in this KIP does not reduce usefulness of producer and consumer > interceptors that enable end-to-end monitoring > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor callbacks > to minimal set of mutable API that are not dependent on producer and > consumer internal implementation. > > ProducerInterceptor: > *ProducerRecord onSend(ProducerRecord record);* > *void onAcknowledgement(RecordMetadata metadata, Exception exception);* > > ConsumerInterceptor: > *ConsumerRecords onConsume(ConsumerRecords records);* > *void onCommit(Map offsets);* > > We will allow interceptors to modify ProducerRecord on producer side, and > modify ConsumerRecords on consumer side. This will support end-to-end > monitoring and auditing and support the ability to add metadata for a > message. This will support Todd’s Auditing and Routing use-cases. > > We did not find any use-case for modifying records in onConsume() callback, > but decided to enable modification of consumer records for symmetry with > onSend(). > > 3. We agreed to ensure compatibility when/if we add new methods to > ProducerInterceptor and ConsumerInterceptor by using default methods with > an empty implementation. Ok to assume Java 8. (This is Ismael’s method #2). > > 4. We decided not to add any callbacks to producer and consumer > interceptors that will depend on internal implementation as part of this > KIP. However, it is possible to add them later as part of another KIP if > there are good use-cases. > > *Reasoning.* We did not have concrete use-cases that justified more methods > at this point. Some of the use-cases were for more fine-grain latency > collection, which could be done with Kafka Metrics. Another use-case was > encryption. However, there are several design options for encryption. One > is to do per-record encryption which would require adding > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One > could argue that in that case encryption could be done by adding a custom > serializer/deserializer. Another option is to do encryption after message > gets compressed, but there are issues that arise regarding broker doing > re-compression. We decided that it is better to have that discussion in a > separate KIP and decide that this is something we want to do with > interceptors or by other means. > > > Todd, Mayuresh and others who missed the KIP meeting, please let me know > your thoughts on the scope we agreed on during the meeting. > > I will update the KIP proposal with the current decision by end of today. > > Thanks, > Anna > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > > > Hi, > > > > I won't be able to make it to KIP hangout due to conflict. > > > > Anna, here is the use case where knowing if there are messages in the > > RecordAccumulator left to be sent to the kafka cluster for a topic is > > useful. > > > > 1) Consider a pipeline : > > A ---> Mirror-maker -> B > > > > 2) We have a topic T in cluster A mirrored to cluster B. > > > > 3) Now if we delete topic T in A and immediately proceed to delete the > > topic in cluster B, some of the the Mirror-maker machines die because > > atleast one of the batches in RecordAccumulator for topic T fail to be > > produced to cluster B. We have seen this happening in our clusters. > > > > > > If we know that there are no more messages left in the RecordAccumulator > to > > be produced to cluster B, we can safely delete the topic in cluster B > > without disturbing the pipeline. > > > > Thanks, > > > > Mayuresh > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner > wrote: > > > > > Thanks Ismael and Todd for your feedback! > > > > > > I agree about coming up with lean, but useful interfaces that will be > > easy > > > to extend later. > > > > > > When we discuss the minimal set of producer and consumer interceptor > API > > in > > > today’s KIP meeting (discussion item #2
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Anna, Thanks a lot for summarizing the discussion on this kip. It LGTM. This is really nice : We decided not to add any callbacks to producer and consumer interceptors that will depend on internal implementation as part of this KIP. *However, it is possible to add them later as part of another KIP if there are good use-cases.* Do you agree with the use case I explained earlier for knowing the number of records left in the RecordAccumulator for a particular topic. It might be orthogonal to this KIP, but will be helpful. What do you think? Thanks, Mayuresh On Tue, Jan 26, 2016 at 2:46 PM, Todd Palinowrote: > This looks good. As noted, having one mutable interceptor on each side > allows for the use cases we can envision right now, and I think that’s > going to provide a great deal of opportunity for implementing things like > audit, especially within a multi-tenant environment. Looking forward to > getting this available in the clients. > > Thanks! > > -Todd > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner wrote: > > > Hi All, > > > > Here is meeting notes from today’s KIP meeting: > > > > 1. We agreed to keep the scope of this KIP to be producer and consumer > > interceptors only. Broker-side interceptor will be added later as a > > separate KIP. The reasons were already mentioned in this thread, but the > > summary is: > > * Broker interceptor is riskier and requires careful consideration about > > overheads, whether to intercept leaders vs. leaders/replicas, what to do > on > > leader failover and so on. > > * Broker interceptors increase monitoring resolution, but not including > it > > in this KIP does not reduce usefulness of producer and consumer > > interceptors that enable end-to-end monitoring > > > > 2. We agreed to scope ProducerInterceptor and ConsumerInterceptor > callbacks > > to minimal set of mutable API that are not dependent on producer and > > consumer internal implementation. > > > > ProducerInterceptor: > > *ProducerRecord onSend(ProducerRecord record);* > > *void onAcknowledgement(RecordMetadata metadata, Exception exception);* > > > > ConsumerInterceptor: > > *ConsumerRecords onConsume(ConsumerRecords records);* > > *void onCommit(Map offsets);* > > > > We will allow interceptors to modify ProducerRecord on producer side, and > > modify ConsumerRecords on consumer side. This will support end-to-end > > monitoring and auditing and support the ability to add metadata for a > > message. This will support Todd’s Auditing and Routing use-cases. > > > > We did not find any use-case for modifying records in onConsume() > callback, > > but decided to enable modification of consumer records for symmetry with > > onSend(). > > > > 3. We agreed to ensure compatibility when/if we add new methods to > > ProducerInterceptor and ConsumerInterceptor by using default methods with > > an empty implementation. Ok to assume Java 8. (This is Ismael’s method > #2). > > > > 4. We decided not to add any callbacks to producer and consumer > > interceptors that will depend on internal implementation as part of this > > KIP. However, it is possible to add them later as part of another KIP if > > there are good use-cases. > > > > *Reasoning.* We did not have concrete use-cases that justified more > methods > > at this point. Some of the use-cases were for more fine-grain latency > > collection, which could be done with Kafka Metrics. Another use-case was > > encryption. However, there are several design options for encryption. One > > is to do per-record encryption which would require adding > > ProducerInterceptor.onEnqueued() and ConsumerInterceptor.onReceive(). One > > could argue that in that case encryption could be done by adding a custom > > serializer/deserializer. Another option is to do encryption after message > > gets compressed, but there are issues that arise regarding broker doing > > re-compression. We decided that it is better to have that discussion in a > > separate KIP and decide that this is something we want to do with > > interceptors or by other means. > > > > > > Todd, Mayuresh and others who missed the KIP meeting, please let me know > > your thoughts on the scope we agreed on during the meeting. > > > > I will update the KIP proposal with the current decision by end of today. > > > > Thanks, > > Anna > > > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < > > gharatmayures...@gmail.com> wrote: > > > > > Hi, > > > > > > I won't be able to make it to KIP hangout due to conflict. > > > > > > Anna, here is the use case where knowing if there are messages in the > > > RecordAccumulator left to be sent to the kafka cluster for a topic is > > > useful. > > > > > > 1) Consider a pipeline : > > > A ---> Mirror-maker -> B > > > > > > 2) We have a topic T in cluster A mirrored to cluster B. > > > > > > 3) Now if we delete topic T in A and immediately proceed to delete
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Anna, I'm also in favor of including just the APIs for which we have a clear use case. If more use cases for finer monitoring show up in the future, we can always update the interface. Would you please highlight in the KIP the APIs that you think we have an immediate use for? Joel, Broker-side monitoring makes a lot of sense in the long term though I don't think it is a requirement for end-to-end monitoring. With the producer and consumer interceptors, you have the ability to get full publish-to-subscribe end-to-end monitoring. The broker interceptor certainly improves the resolution of monitoring but it is also a riskier change. I prefer an incremental approach over a big-bang and recommend taking baby-steps. Let's first make sure the producer/consumer interceptors are successful. And then come back and add the broker interceptor carefully. Having said that, it would be great to understand your proposal for the broker interceptor independently. We can either add an interceptor on-append or on-commit. If people want to use this for monitoring, then possibly on-commit might be more useful? Thanks, Neha On Mon, Jan 25, 2016 at 6:47 PM, Jay Krepswrote: > Hey Joel, > > What is the interface you are thinking of? Something like this: > onAppend(String topic, int partition, Records records, long time) > ? > > One challenge right now is that we are still using the old > Message/MessageSet classes on the broker which I'm not sure if we'd want to > support over the long haul but it might be okay just to create the records > instance for this interface. > > -Jay > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy wrote: > > > I'm definitely in favor of having such hooks in the produce/consume > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > > pretty much how it was: > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > i.e., we had something similar to the interceptor proposal for various > > stages of the producer request. The producer provided call-backs for > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > > LinkedIn we in fact did auditing within these call-backs (and not > > explicitly in the wrapper). Over time and with 0.8 we moved that out to > the > > wrapper libraries. > > > > On a side-note while audit and other monitoring can be done internally > in a > > convenient way I think it should be clarified that having a wrapper is in > > general not a bad idea and I would even consider it to be a > best-practice. > > Even with 0.7 we still had a wrapper library and that API has largely > > stayed the same and has helped protect against (sometimes backwards > > incompatible) changes in open source. > > > > While we are on this topic I have one comment and Anna, you may have > > already considered this but I don't see mention of it in the KIP: > > > > Add a custom message interceptor/validator on the broker on message > > arrival. > > > > We decompress and do basic validation of messages on arrival. I think > there > > is value in supporting custom validation and expand it to support custom > > on-arrival processing. Here is a specific use-case I have in mind. The > blog > > that James referenced describes our auditing infrastructure. In order to > > audit the Kafka cluster itself we need to run a "console auditor" service > > that consumes everything and spits out audit events back to the cluster. > I > > would prefer not having to run this service because: > > > >- Well, it is one more service that we have to run and monitor > >- Consuming everything takes up bandwidth which can be avoided > >- The console auditor consumer itself can lag and cause temporary > audit > >discrepancies > > > > One way we can mitigate this is by having mirror-makers in between > clusters > > emit audit events. The problem is that the very last cluster in the > > pipeline will not have any audit which is why we need to have something > to > > audit the cluster. > > > > If we had a custom message validator then the audit can be done > on-arrival > > and we won't need a console auditor. > > > > One potential issue in this approach and any elaborate on-arrival > > processing for that matter is that you may need to deserialize the > message > > as well which can drive up produce request handling times. However I'm > not > > terribly concerned about that especially if the audit header can be > > separated out easily or even deserialized partially as this Avro thread > > touches on > > > > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > > > Thanks, > > > > Joel > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com> wrote: > > > > > Nice KIP. Excellent idea. > > > Was just thinking if we can add onDequed() to the ProducerIterceptor > > > interface. Since
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Thanks everyone for your comments so far! Since we have a KIP meeting tomorrow, here is the list of items I propose to discuss in the meeting based on everyone's comments. *1. Should we add broker-side interceptor to this KIP?* Pros: Improves resolution of monitoring Cons: Riskier and bigger change Current proposal: Do not include broker-side interceptor in KIP-42, but create a separate broker interceptor KIP later. *2. Minimal set of ProducerInterceptor and ConsumerInterceptor API* This is API that are required for end-to-end tracing and monitoring and are *not* dependent on producer and consumer implementation. Here is the proposal to start the discussion: *ProducerInterceptor:* public void onSend(ProducerRecordrecord); public void onAcknowledgement(RecordMetadata metadata, Exception exception); *ConsumerInterceptor:* public void onConsume(ConsumerRecords records); public void onCommit(Map offsets); These API support possible different requirements for what end-to-end means -- e.g., from sending to producer to delivering to consumer client, or only accounting for messages that were successfully produced and/or messages that we know were handled by an application. *3. Should we add more callbacks to ProducerInterceptor and ConsumerInterceptor?* Additional callbacks will be more dependent on implementation, and therefore, changing internal producer or consumer implementation later may cause changes in interceptor interfaces. List of additional callbacks that were mentioned so far: *1. ProducerInterceptor.onEnqueued(TopicPartition tp, ProducerRecord record, SerializedKeyValue serializedKeyValue);* *2. ProducerInterceptor.onDequeued(TopicPartition tp, long appendTime, int attempts, bool moreQueued);* *3. ConsumerInterceptor.onReceive(TopicPartition tp, SerializedKeyValue serializedKeyValue);* Use-cases: 1. Per-message, finer-grain, latencies (serialization overhead, time spent in record accumulator, latency inside the consumer). Most of these latencies are available as averages/max as Kafka Metrics (or we could add corresponding Kafka Metrics). One potential advantage is collecting these metrics per-message, e.g. for calculating latency percentiles rather than averages/max. However, it is not clear if that much detail is actually needed by any real use-cases. Needs discussion. 2. Any other use-cases? Thanks, Anna On Mon, Jan 25, 2016 at 9:59 PM, Neha Narkhede wrote: > Anna, > > I'm also in favor of including just the APIs for which we have a clear use > case. If more use cases for finer monitoring show up in the future, we can > always update the interface. Would you please highlight in the KIP the APIs > that you think we have an immediate use for? > > Joel, > > Broker-side monitoring makes a lot of sense in the long term though I don't > think it is a requirement for end-to-end monitoring. With the producer and > consumer interceptors, you have the ability to get full > publish-to-subscribe end-to-end monitoring. The broker interceptor > certainly improves the resolution of monitoring but it is also a riskier > change. I prefer an incremental approach over a big-bang and recommend > taking baby-steps. Let's first make sure the producer/consumer interceptors > are successful. And then come back and add the broker interceptor > carefully. > > Having said that, it would be great to understand your proposal for the > broker interceptor independently. We can either add an interceptor > on-append or on-commit. If people want to use this for monitoring, then > possibly on-commit might be more useful? > > Thanks, > Neha > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps wrote: > > > Hey Joel, > > > > What is the interface you are thinking of? Something like this: > > onAppend(String topic, int partition, Records records, long time) > > ? > > > > One challenge right now is that we are still using the old > > Message/MessageSet classes on the broker which I'm not sure if we'd want > to > > support over the long haul but it might be okay just to create the > records > > instance for this interface. > > > > -Jay > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy > wrote: > > > > > I'm definitely in favor of having such hooks in the produce/consume > > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > > > pretty much how it was: > > > > > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > > i.e., we had something similar to the interceptor proposal for various > > > stages of the producer request. The producer provided call-backs for > > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > > > LinkedIn we in fact did auditing within these call-backs (and not > > > explicitly in the wrapper). Over time and with 0.8 we moved that out to > > the > > > wrapper libraries. > > > > > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Great idea. I’ve been talking about this for 2 years, and I’m glad someone is finally picking it up. Will take a look at the KIP at some point shortly. -Todd On Mon, Jan 25, 2016 at 11:24 AM, Jay Krepswrote: > Hey Becket, > > Yeah this is really similar to the callback. The difference is really in > who sets the behavior. The idea of the interceptor is that it doesn't > require any code change in apps so you can globally add behavior to your > Kafka usage without changing app code. Whereas the callback is added by the > app. The idea is to kind of obviate the need for the wrapper code that e.g. > LinkedIn maintains to hold this kind of stuff. > > -Jay > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin wrote: > > > This could be a useful feature. And I think there are some use cases to > > mutate the data like rejected alternative one mentioned. > > > > I am wondering if there is functional overlapping between > > ProducerInterceptor.onAcknowledgement() and the producer callback? I can > > see that the Callback could be a per record setting while > > onAcknowledgement() is a producer level setting. Other than that, is > there > > any difference between them? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede > wrote: > > > > > James, > > > > > > That is one of the many monitoring use cases for the interceptor > > interface. > > > > > > Thanks, > > > Neha > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng wrote: > > > > > > > Anna, > > > > > > > > I'm trying to understand a concrete use case. It sounds like producer > > > > interceptors could be used to implement part of LinkedIn's Kafak > Audit > > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > > > > Part of that is done by a wrapper library around the kafka producer > > that > > > > keeps a count of the number of messages produced, and then sends that > > > count > > > > to a side-topic. It sounds like the producer interceptors could > > possibly > > > be > > > > used to implement that? > > > > > > > > -James > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner > wrote: > > > > > > > > > > Hi, > > > > > > > > > > I just created a KIP-42 for adding producer and consumer > interceptors > > > for > > > > > intercepting messages at different points on producer and consumer. > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > > > > > Comments and suggestions are welcome! > > > > > > > > > > Thanks, > > > > > Anna > > > > > > > > > > > > > > > > > > > > This email and any attachments may contain confidential and > privileged > > > > material for the sole use of the intended recipient. Any review, > > copying, > > > > or distribution of this email (or any attachments) by others is > > > prohibited. > > > > If you are not the intended recipient, please contact the sender > > > > immediately and permanently delete this email and any attachments. No > > > > employee or agent of TiVo Inc. is authorized to conclude any binding > > > > agreement on behalf of TiVo Inc. by email. Binding agreements with > TiVo > > > > Inc. may only be made by a signed written agreement. > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Neha > > > > > > -- *—-* *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
I'm definitely in favor of having such hooks in the produce/consume life-cycle. Not sure if people remember this but in Kafka 0.7 this was pretty much how it was: https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala i.e., we had something similar to the interceptor proposal for various stages of the producer request. The producer provided call-backs for beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at LinkedIn we in fact did auditing within these call-backs (and not explicitly in the wrapper). Over time and with 0.8 we moved that out to the wrapper libraries. On a side-note while audit and other monitoring can be done internally in a convenient way I think it should be clarified that having a wrapper is in general not a bad idea and I would even consider it to be a best-practice. Even with 0.7 we still had a wrapper library and that API has largely stayed the same and has helped protect against (sometimes backwards incompatible) changes in open source. While we are on this topic I have one comment and Anna, you may have already considered this but I don't see mention of it in the KIP: Add a custom message interceptor/validator on the broker on message arrival. We decompress and do basic validation of messages on arrival. I think there is value in supporting custom validation and expand it to support custom on-arrival processing. Here is a specific use-case I have in mind. The blog that James referenced describes our auditing infrastructure. In order to audit the Kafka cluster itself we need to run a "console auditor" service that consumes everything and spits out audit events back to the cluster. I would prefer not having to run this service because: - Well, it is one more service that we have to run and monitor - Consuming everything takes up bandwidth which can be avoided - The console auditor consumer itself can lag and cause temporary audit discrepancies One way we can mitigate this is by having mirror-makers in between clusters emit audit events. The problem is that the very last cluster in the pipeline will not have any audit which is why we need to have something to audit the cluster. If we had a custom message validator then the audit can be done on-arrival and we won't need a console auditor. One potential issue in this approach and any elaborate on-arrival processing for that matter is that you may need to deserialize the message as well which can drive up produce request handling times. However I'm not terribly concerned about that especially if the audit header can be separated out easily or even deserialized partially as this Avro thread touches on http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ Thanks, Joel On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < gharatmayures...@gmail.com> wrote: > Nice KIP. Excellent idea. > Was just thinking if we can add onDequed() to the ProducerIterceptor > interface. Since we have the onEnqueued(), it will help the client or the > tools to know how much time the message spent in the RecordAccumulator. > Also an API to check if there are any messages left for a particular topic > in the RecordAccumulator would help. > > Thanks, > > Mayuresh > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palinowrote: > > > Great idea. I’ve been talking about this for 2 years, and I’m glad > someone > > is finally picking it up. Will take a look at the KIP at some point > > shortly. > > > > -Todd > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote: > > > > > Hey Becket, > > > > > > Yeah this is really similar to the callback. The difference is really > in > > > who sets the behavior. The idea of the interceptor is that it doesn't > > > require any code change in apps so you can globally add behavior to > your > > > Kafka usage without changing app code. Whereas the callback is added by > > the > > > app. The idea is to kind of obviate the need for the wrapper code that > > e.g. > > > LinkedIn maintains to hold this kind of stuff. > > > > > > -Jay > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin > > wrote: > > > > > > > This could be a useful feature. And I think there are some use cases > to > > > > mutate the data like rejected alternative one mentioned. > > > > > > > > I am wondering if there is functional overlapping between > > > > ProducerInterceptor.onAcknowledgement() and the producer callback? I > > can > > > > see that the Callback could be a per record setting while > > > > onAcknowledgement() is a producer level setting. Other than that, is > > > there > > > > any difference between them? > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede > > > wrote: > > > > > > > > > James, > > > > > > > > > > That is one of the many monitoring use cases
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hey Becket, Yeah this is really similar to the callback. The difference is really in who sets the behavior. The idea of the interceptor is that it doesn't require any code change in apps so you can globally add behavior to your Kafka usage without changing app code. Whereas the callback is added by the app. The idea is to kind of obviate the need for the wrapper code that e.g. LinkedIn maintains to hold this kind of stuff. -Jay On Sun, Jan 24, 2016 at 4:21 PM, Becket Qinwrote: > This could be a useful feature. And I think there are some use cases to > mutate the data like rejected alternative one mentioned. > > I am wondering if there is functional overlapping between > ProducerInterceptor.onAcknowledgement() and the producer callback? I can > see that the Callback could be a per record setting while > onAcknowledgement() is a producer level setting. Other than that, is there > any difference between them? > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede wrote: > > > James, > > > > That is one of the many monitoring use cases for the interceptor > interface. > > > > Thanks, > > Neha > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng wrote: > > > > > Anna, > > > > > > I'm trying to understand a concrete use case. It sounds like producer > > > interceptors could be used to implement part of LinkedIn's Kafak Audit > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > > Part of that is done by a wrapper library around the kafka producer > that > > > keeps a count of the number of messages produced, and then sends that > > count > > > to a side-topic. It sounds like the producer interceptors could > possibly > > be > > > used to implement that? > > > > > > -James > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner wrote: > > > > > > > > Hi, > > > > > > > > I just created a KIP-42 for adding producer and consumer interceptors > > for > > > > intercepting messages at different points on producer and consumer. > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > > > Comments and suggestions are welcome! > > > > > > > > Thanks, > > > > Anna > > > > > > > > > > > > > > > This email and any attachments may contain confidential and privileged > > > material for the sole use of the intended recipient. Any review, > copying, > > > or distribution of this email (or any attachments) by others is > > prohibited. > > > If you are not the intended recipient, please contact the sender > > > immediately and permanently delete this email and any attachments. No > > > employee or agent of TiVo Inc. is authorized to conclude any binding > > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > > > Inc. may only be made by a signed written agreement. > > > > > > > > > > > -- > > Thanks, > > Neha > > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Nice KIP. Excellent idea. Was just thinking if we can add onDequed() to the ProducerIterceptor interface. Since we have the onEnqueued(), it will help the client or the tools to know how much time the message spent in the RecordAccumulator. Also an API to check if there are any messages left for a particular topic in the RecordAccumulator would help. Thanks, Mayuresh On Mon, Jan 25, 2016 at 11:29 AM, Todd Palinowrote: > Great idea. I’ve been talking about this for 2 years, and I’m glad someone > is finally picking it up. Will take a look at the KIP at some point > shortly. > > -Todd > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote: > > > Hey Becket, > > > > Yeah this is really similar to the callback. The difference is really in > > who sets the behavior. The idea of the interceptor is that it doesn't > > require any code change in apps so you can globally add behavior to your > > Kafka usage without changing app code. Whereas the callback is added by > the > > app. The idea is to kind of obviate the need for the wrapper code that > e.g. > > LinkedIn maintains to hold this kind of stuff. > > > > -Jay > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin > wrote: > > > > > This could be a useful feature. And I think there are some use cases to > > > mutate the data like rejected alternative one mentioned. > > > > > > I am wondering if there is functional overlapping between > > > ProducerInterceptor.onAcknowledgement() and the producer callback? I > can > > > see that the Callback could be a per record setting while > > > onAcknowledgement() is a producer level setting. Other than that, is > > there > > > any difference between them? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede > > wrote: > > > > > > > James, > > > > > > > > That is one of the many monitoring use cases for the interceptor > > > interface. > > > > > > > > Thanks, > > > > Neha > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng > wrote: > > > > > > > > > Anna, > > > > > > > > > > I'm trying to understand a concrete use case. It sounds like > producer > > > > > interceptors could be used to implement part of LinkedIn's Kafak > > Audit > > > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > > > > > > Part of that is done by a wrapper library around the kafka producer > > > that > > > > > keeps a count of the number of messages produced, and then sends > that > > > > count > > > > > to a side-topic. It sounds like the producer interceptors could > > > possibly > > > > be > > > > > used to implement that? > > > > > > > > > > -James > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner > > wrote: > > > > > > > > > > > > Hi, > > > > > > > > > > > > I just created a KIP-42 for adding producer and consumer > > interceptors > > > > for > > > > > > intercepting messages at different points on producer and > consumer. > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > > > > > > > Comments and suggestions are welcome! > > > > > > > > > > > > Thanks, > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > > This email and any attachments may contain confidential and > > privileged > > > > > material for the sole use of the intended recipient. Any review, > > > copying, > > > > > or distribution of this email (or any attachments) by others is > > > > prohibited. > > > > > If you are not the intended recipient, please contact the sender > > > > > immediately and permanently delete this email and any attachments. > No > > > > > employee or agent of TiVo Inc. is authorized to conclude any > binding > > > > > agreement on behalf of TiVo Inc. by email. Binding agreements with > > TiVo > > > > > Inc. may only be made by a signed written agreement. > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Neha > > > > > > > > > > > > > -- > *—-* > *Todd Palino* > Staff Site Reliability Engineer > Data Infrastructure Streaming > > > > linkedin.com/in/toddpalino > -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Mayuresh, I agree that onDequeue() callback in ProducerInterceptor would be useful. The API of this callback needs discussion. I propose the following API: public void onDequeued(TopicPartition tp, long appendTime, int attempts, bool moreQueued); This callback will be called for every record in RecordBatch from RecordAccumulator.drain() method. This API will require keeping appendTime of every record in RecordBatch.Thunk. An alternative way for the interceptor to calculate time spent in the accumulator is to record append time itself on onEnqueued() and then record dequeued time in OnDequeued(). However, there is no way to match which call of onDequeued() corresponds to onEnqueued(), since ProducerRecord object is not available anymore at the time the batch is dequeued. Feedback on ProducerInterceptor.onDequeued() API is welcome. Thanks, Anna On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshywrote: > I'm definitely in favor of having such hooks in the produce/consume > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > pretty much how it was: > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > i.e., we had something similar to the interceptor proposal for various > stages of the producer request. The producer provided call-backs for > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > LinkedIn we in fact did auditing within these call-backs (and not > explicitly in the wrapper). Over time and with 0.8 we moved that out to the > wrapper libraries. > > On a side-note while audit and other monitoring can be done internally in a > convenient way I think it should be clarified that having a wrapper is in > general not a bad idea and I would even consider it to be a best-practice. > Even with 0.7 we still had a wrapper library and that API has largely > stayed the same and has helped protect against (sometimes backwards > incompatible) changes in open source. > > While we are on this topic I have one comment and Anna, you may have > already considered this but I don't see mention of it in the KIP: > > Add a custom message interceptor/validator on the broker on message > arrival. > > We decompress and do basic validation of messages on arrival. I think there > is value in supporting custom validation and expand it to support custom > on-arrival processing. Here is a specific use-case I have in mind. The blog > that James referenced describes our auditing infrastructure. In order to > audit the Kafka cluster itself we need to run a "console auditor" service > that consumes everything and spits out audit events back to the cluster. I > would prefer not having to run this service because: > >- Well, it is one more service that we have to run and monitor >- Consuming everything takes up bandwidth which can be avoided >- The console auditor consumer itself can lag and cause temporary audit >discrepancies > > One way we can mitigate this is by having mirror-makers in between clusters > emit audit events. The problem is that the very last cluster in the > pipeline will not have any audit which is why we need to have something to > audit the cluster. > > If we had a custom message validator then the audit can be done on-arrival > and we won't need a console auditor. > > One potential issue in this approach and any elaborate on-arrival > processing for that matter is that you may need to deserialize the message > as well which can drive up produce request handling times. However I'm not > terribly concerned about that especially if the audit header can be > separated out easily or even deserialized partially as this Avro thread > touches on > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > Thanks, > > Joel > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > > > Nice KIP. Excellent idea. > > Was just thinking if we can add onDequed() to the ProducerIterceptor > > interface. Since we have the onEnqueued(), it will help the client or the > > tools to know how much time the message spent in the RecordAccumulator. > > Also an API to check if there are any messages left for a particular > topic > > in the RecordAccumulator would help. > > > > Thanks, > > > > Mayuresh > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino wrote: > > > > > Great idea. I’ve been talking about this for 2 years, and I’m glad > > someone > > > is finally picking it up. Will take a look at the KIP at some point > > > shortly. > > > > > > -Todd > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote: > > > > > > > Hey Becket, > > > > > > > > Yeah this is really similar to the callback. The difference is really > > in > > > > who sets the behavior. The idea of the interceptor is that it doesn't > > > > require any code change in apps so you can
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi anna, Agreed, its difficult to match which call of onDequeued() corresponds to onEnqueued(), since ProducerRecord object is not available anymore at the time the batch is dequeued. We currently always update the lastAppendTime whenever we append a record to a batch. We can use that and include it at record level for each record separately whenever it gets appended in tryAppend(). Also this might be orthogonal, but is there a plan to expose an API to check if we have messages in RecordAccumulator for a particular topic? I have a usecase that I can discuss if we plan for this API. Thanks, Mayuresh On Mon, Jan 25, 2016 at 4:49 PM, Anna Povznerwrote: > Hi Mayuresh, > > I agree that onDequeue() callback in ProducerInterceptor would be useful. > The API of this callback needs discussion. I propose the following API: > > public void onDequeued(TopicPartition tp, long appendTime, int attempts, > bool moreQueued); > > This callback will be called for every record in RecordBatch from > RecordAccumulator.drain() method. This API will require keeping appendTime > of every record in RecordBatch.Thunk. An alternative way for the > interceptor to calculate time spent in the accumulator is to record append > time itself on onEnqueued() and then record dequeued time in OnDequeued(). > However, there is no way to match which call of onDequeued() corresponds to > onEnqueued(), since ProducerRecord object is not available anymore at the > time the batch is dequeued. > > Feedback on ProducerInterceptor.onDequeued() API is welcome. > > Thanks, > Anna > > > > > On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshy wrote: > > > I'm definitely in favor of having such hooks in the produce/consume > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > > pretty much how it was: > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > i.e., we had something similar to the interceptor proposal for various > > stages of the producer request. The producer provided call-backs for > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > > LinkedIn we in fact did auditing within these call-backs (and not > > explicitly in the wrapper). Over time and with 0.8 we moved that out to > the > > wrapper libraries. > > > > On a side-note while audit and other monitoring can be done internally > in a > > convenient way I think it should be clarified that having a wrapper is in > > general not a bad idea and I would even consider it to be a > best-practice. > > Even with 0.7 we still had a wrapper library and that API has largely > > stayed the same and has helped protect against (sometimes backwards > > incompatible) changes in open source. > > > > While we are on this topic I have one comment and Anna, you may have > > already considered this but I don't see mention of it in the KIP: > > > > Add a custom message interceptor/validator on the broker on message > > arrival. > > > > We decompress and do basic validation of messages on arrival. I think > there > > is value in supporting custom validation and expand it to support custom > > on-arrival processing. Here is a specific use-case I have in mind. The > blog > > that James referenced describes our auditing infrastructure. In order to > > audit the Kafka cluster itself we need to run a "console auditor" service > > that consumes everything and spits out audit events back to the cluster. > I > > would prefer not having to run this service because: > > > >- Well, it is one more service that we have to run and monitor > >- Consuming everything takes up bandwidth which can be avoided > >- The console auditor consumer itself can lag and cause temporary > audit > >discrepancies > > > > One way we can mitigate this is by having mirror-makers in between > clusters > > emit audit events. The problem is that the very last cluster in the > > pipeline will not have any audit which is why we need to have something > to > > audit the cluster. > > > > If we had a custom message validator then the audit can be done > on-arrival > > and we won't need a console auditor. > > > > One potential issue in this approach and any elaborate on-arrival > > processing for that matter is that you may need to deserialize the > message > > as well which can drive up produce request handling times. However I'm > not > > terribly concerned about that especially if the audit header can be > > separated out easily or even deserialized partially as this Avro thread > > touches on > > > > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > > > Thanks, > > > > Joel > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > > gharatmayures...@gmail.com> wrote: > > > > > Nice KIP. Excellent idea. > > > Was just thinking if we can add onDequed() to the ProducerIterceptor > > > interface. Since we have the
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Mayuresh, Could you please describe a use-case for checking if there are messages in RecordAccumulator for a particular topic? In this KIP, we are proposing interceptor callbacks that will notify things about the message, no query API. It is possible to add certain info in callback API, like 'moreQueued' boolean parameter I proposed in onDequeued() callback. However, we need to find a good balance between exposing internals of producer and consumer for finer monitoring vs. exposing too much so that changing internal implementation will cause changing interceptor interfaces. Lets discuss this during tomorrow's KIP meeting. Thanks, Anna On Mon, Jan 25, 2016 at 7:26 PM, Mayuresh Gharatwrote: > Hi anna, > > Agreed, its difficult to match which call of onDequeued() corresponds to > onEnqueued(), since ProducerRecord object is not available anymore at the > time the batch is dequeued. > > We currently always update the lastAppendTime whenever we append a record > to a batch. We can use that and include it at record level for each record > separately whenever it gets appended in tryAppend(). > > Also this might be orthogonal, but is there a plan to expose an API to > check if we have messages in RecordAccumulator for a particular topic? I > have a usecase that I can discuss if we plan for this API. > > Thanks, > > Mayuresh > > > On Mon, Jan 25, 2016 at 4:49 PM, Anna Povzner wrote: > > > Hi Mayuresh, > > > > I agree that onDequeue() callback in ProducerInterceptor would be useful. > > The API of this callback needs discussion. I propose the following API: > > > > public void onDequeued(TopicPartition tp, long appendTime, int attempts, > > bool moreQueued); > > > > This callback will be called for every record in RecordBatch from > > RecordAccumulator.drain() method. This API will require keeping > appendTime > > of every record in RecordBatch.Thunk. An alternative way for the > > interceptor to calculate time spent in the accumulator is to record > append > > time itself on onEnqueued() and then record dequeued time in > OnDequeued(). > > However, there is no way to match which call of onDequeued() corresponds > to > > onEnqueued(), since ProducerRecord object is not available anymore at the > > time the batch is dequeued. > > > > Feedback on ProducerInterceptor.onDequeued() API is welcome. > > > > Thanks, > > Anna > > > > > > > > > > On Mon, Jan 25, 2016 at 1:37 PM, Joel Koshy wrote: > > > > > I'm definitely in favor of having such hooks in the produce/consume > > > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > > > pretty much how it was: > > > > > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > > i.e., we had something similar to the interceptor proposal for various > > > stages of the producer request. The producer provided call-backs for > > > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > > > LinkedIn we in fact did auditing within these call-backs (and not > > > explicitly in the wrapper). Over time and with 0.8 we moved that out to > > the > > > wrapper libraries. > > > > > > On a side-note while audit and other monitoring can be done internally > > in a > > > convenient way I think it should be clarified that having a wrapper is > in > > > general not a bad idea and I would even consider it to be a > > best-practice. > > > Even with 0.7 we still had a wrapper library and that API has largely > > > stayed the same and has helped protect against (sometimes backwards > > > incompatible) changes in open source. > > > > > > While we are on this topic I have one comment and Anna, you may have > > > already considered this but I don't see mention of it in the KIP: > > > > > > Add a custom message interceptor/validator on the broker on message > > > arrival. > > > > > > We decompress and do basic validation of messages on arrival. I think > > there > > > is value in supporting custom validation and expand it to support > custom > > > on-arrival processing. Here is a specific use-case I have in mind. The > > blog > > > that James referenced describes our auditing infrastructure. In order > to > > > audit the Kafka cluster itself we need to run a "console auditor" > service > > > that consumes everything and spits out audit events back to the > cluster. > > I > > > would prefer not having to run this service because: > > > > > >- Well, it is one more service that we have to run and monitor > > >- Consuming everything takes up bandwidth which can be avoided > > >- The console auditor consumer itself can lag and cause temporary > > audit > > >discrepancies > > > > > > One way we can mitigate this is by having mirror-makers in between > > clusters > > > emit audit events. The problem is that the very last cluster in the > > > pipeline will not have any audit which is why we need to have something > > to
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
I think there is some tension here between exposing lots of hooks that you can implement and giving a simple, supportable interface. Personally I would actually argue for not adding an onDequeued but actually removing both the onEnqueued() and onReceived(). My argument is that onSend and onAcknowledgment are fundamental aspects of the producer protocol. This means they are easy to understand, and they don't depend on our implementation at all. I think this implies we can commit to the API over the long haul. The same is true for the consumer for onConsume and onCommit. For example whether onConsume should take the serialized or deserialized value is very dependent on whether we choose to do serialization lazily or not which is still totally a matter of debate. I think maybe the best way to figure this out would be to try to come up with a set of concrete use cases that justify each method. And then we can think if it is worth it to include or not. I am a little wary of arguments along the lines of "it could be useful" because of course you could say that about any point in the code path. For onDequeued, I think we already have the queue time metric in the producer, right? I think that provides insight into the time spent in the queue without a need for a plugin, right? (I may have misunderstood the use case, though). Are there other use cases? -Jay On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < gharatmayures...@gmail.com> wrote: > Nice KIP. Excellent idea. > Was just thinking if we can add onDequed() to the ProducerIterceptor > interface. Since we have the onEnqueued(), it will help the client or the > tools to know how much time the message spent in the RecordAccumulator. > Also an API to check if there are any messages left for a particular topic > in the RecordAccumulator would help. > > Thanks, > > Mayuresh > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palinowrote: > > > Great idea. I’ve been talking about this for 2 years, and I’m glad > someone > > is finally picking it up. Will take a look at the KIP at some point > > shortly. > > > > -Todd > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote: > > > > > Hey Becket, > > > > > > Yeah this is really similar to the callback. The difference is really > in > > > who sets the behavior. The idea of the interceptor is that it doesn't > > > require any code change in apps so you can globally add behavior to > your > > > Kafka usage without changing app code. Whereas the callback is added by > > the > > > app. The idea is to kind of obviate the need for the wrapper code that > > e.g. > > > LinkedIn maintains to hold this kind of stuff. > > > > > > -Jay > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin > > wrote: > > > > > > > This could be a useful feature. And I think there are some use cases > to > > > > mutate the data like rejected alternative one mentioned. > > > > > > > > I am wondering if there is functional overlapping between > > > > ProducerInterceptor.onAcknowledgement() and the producer callback? I > > can > > > > see that the Callback could be a per record setting while > > > > onAcknowledgement() is a producer level setting. Other than that, is > > > there > > > > any difference between them? > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede > > > wrote: > > > > > > > > > James, > > > > > > > > > > That is one of the many monitoring use cases for the interceptor > > > > interface. > > > > > > > > > > Thanks, > > > > > Neha > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng > > wrote: > > > > > > > > > > > Anna, > > > > > > > > > > > > I'm trying to understand a concrete use case. It sounds like > > producer > > > > > > interceptors could be used to implement part of LinkedIn's Kafak > > > Audit > > > > > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > > > > > > > > Part of that is done by a wrapper library around the kafka > producer > > > > that > > > > > > keeps a count of the number of messages produced, and then sends > > that > > > > > count > > > > > > to a side-topic. It sounds like the producer interceptors could > > > > possibly > > > > > be > > > > > > used to implement that? > > > > > > > > > > > > -James > > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner > > > wrote: > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I just created a KIP-42 for adding producer and consumer > > > interceptors > > > > > for > > > > > > > intercepting messages at different points on producer and > > consumer. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > > > > > > > > > Comments and suggestions are welcome! > > > > > > > > > > > > > >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi Joel, Yes, we considered interceptors on broker side -- they make a lot of sense and will add more detail to monitoring. We propose to do it later in a separate KIP because: 1) broker interceptors are more risky, since brokers are more sensitive to overheads; 2) Just producer and consumer interceptors will enable end-to-end monitoring and thus a better reward vs. risk tradeoff; 3) Once we have producer and consumer interceptors, we gain more experience and see usability; 4) Once we see usability, we can start working on broker interceptors as separate KIP. I added broker interceptors to Rejected Alternatives section (out-of-scope for this KIP). Let me know your thoughts. Thanks, Anna On Mon, Jan 25, 2016 at 7:37 PM, Jay Krepswrote: > I think there is some tension here between exposing lots of hooks that you > can implement and giving a simple, supportable interface. > > Personally I would actually argue for not adding an onDequeued but actually > removing both the onEnqueued() and onReceived(). > > My argument is that onSend and onAcknowledgment are fundamental aspects of > the producer protocol. This means they are easy to understand, and they > don't depend on our implementation at all. I think this implies we can > commit to the API over the long haul. The same is true for the consumer for > onConsume and onCommit. > > For example whether onConsume should take the serialized or deserialized > value is very dependent on whether we choose to do serialization lazily or > not which is still totally a matter of debate. > > I think maybe the best way to figure this out would be to try to come up > with a set of concrete use cases that justify each method. And then we can > think if it is worth it to include or not. I am a little wary of arguments > along the lines of "it could be useful" because of course you could say > that about any point in the code path. > > For onDequeued, I think we already have the queue time metric in the > producer, right? I think that provides insight into the time spent in the > queue without a need for a plugin, right? (I may have misunderstood the use > case, though). Are there other use cases? > > -Jay > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > > > Nice KIP. Excellent idea. > > Was just thinking if we can add onDequed() to the ProducerIterceptor > > interface. Since we have the onEnqueued(), it will help the client or the > > tools to know how much time the message spent in the RecordAccumulator. > > Also an API to check if there are any messages left for a particular > topic > > in the RecordAccumulator would help. > > > > Thanks, > > > > Mayuresh > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino wrote: > > > > > Great idea. I’ve been talking about this for 2 years, and I’m glad > > someone > > > is finally picking it up. Will take a look at the KIP at some point > > > shortly. > > > > > > -Todd > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote: > > > > > > > Hey Becket, > > > > > > > > Yeah this is really similar to the callback. The difference is really > > in > > > > who sets the behavior. The idea of the interceptor is that it doesn't > > > > require any code change in apps so you can globally add behavior to > > your > > > > Kafka usage without changing app code. Whereas the callback is added > by > > > the > > > > app. The idea is to kind of obviate the need for the wrapper code > that > > > e.g. > > > > LinkedIn maintains to hold this kind of stuff. > > > > > > > > -Jay > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin > > > wrote: > > > > > > > > > This could be a useful feature. And I think there are some use > cases > > to > > > > > mutate the data like rejected alternative one mentioned. > > > > > > > > > > I am wondering if there is functional overlapping between > > > > > ProducerInterceptor.onAcknowledgement() and the producer callback? > I > > > can > > > > > see that the Callback could be a per record setting while > > > > > onAcknowledgement() is a producer level setting. Other than that, > is > > > > there > > > > > any difference between them? > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhede > > > > wrote: > > > > > > > > > > > James, > > > > > > > > > > > > That is one of the many monitoring use cases for the interceptor > > > > > interface. > > > > > > > > > > > > Thanks, > > > > > > Neha > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng > > > wrote: > > > > > > > > > > > > > Anna, > > > > > > > > > > > > > > I'm trying to understand a concrete use case. It sounds like > > > producer > > > > > > > interceptors could be used to implement part of LinkedIn's > Kafak > > > > Audit > > > > > > > tool? >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hey Joel, What is the interface you are thinking of? Something like this: onAppend(String topic, int partition, Records records, long time) ? One challenge right now is that we are still using the old Message/MessageSet classes on the broker which I'm not sure if we'd want to support over the long haul but it might be okay just to create the records instance for this interface. -Jay On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshywrote: > I'm definitely in favor of having such hooks in the produce/consume > life-cycle. Not sure if people remember this but in Kafka 0.7 this was > pretty much how it was: > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > i.e., we had something similar to the interceptor proposal for various > stages of the producer request. The producer provided call-backs for > beforeEnqueue, afterEnqueue, afterDequeuing, beforeSending, etc. So at > LinkedIn we in fact did auditing within these call-backs (and not > explicitly in the wrapper). Over time and with 0.8 we moved that out to the > wrapper libraries. > > On a side-note while audit and other monitoring can be done internally in a > convenient way I think it should be clarified that having a wrapper is in > general not a bad idea and I would even consider it to be a best-practice. > Even with 0.7 we still had a wrapper library and that API has largely > stayed the same and has helped protect against (sometimes backwards > incompatible) changes in open source. > > While we are on this topic I have one comment and Anna, you may have > already considered this but I don't see mention of it in the KIP: > > Add a custom message interceptor/validator on the broker on message > arrival. > > We decompress and do basic validation of messages on arrival. I think there > is value in supporting custom validation and expand it to support custom > on-arrival processing. Here is a specific use-case I have in mind. The blog > that James referenced describes our auditing infrastructure. In order to > audit the Kafka cluster itself we need to run a "console auditor" service > that consumes everything and spits out audit events back to the cluster. I > would prefer not having to run this service because: > >- Well, it is one more service that we have to run and monitor >- Consuming everything takes up bandwidth which can be avoided >- The console auditor consumer itself can lag and cause temporary audit >discrepancies > > One way we can mitigate this is by having mirror-makers in between clusters > emit audit events. The problem is that the very last cluster in the > pipeline will not have any audit which is why we need to have something to > audit the cluster. > > If we had a custom message validator then the audit can be done on-arrival > and we won't need a console auditor. > > One potential issue in this approach and any elaborate on-arrival > processing for that matter is that you may need to deserialize the message > as well which can drive up produce request handling times. However I'm not > terribly concerned about that especially if the audit header can be > separated out easily or even deserialized partially as this Avro thread > touches on > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > Thanks, > > Joel > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat < > gharatmayures...@gmail.com> wrote: > > > Nice KIP. Excellent idea. > > Was just thinking if we can add onDequed() to the ProducerIterceptor > > interface. Since we have the onEnqueued(), it will help the client or the > > tools to know how much time the message spent in the RecordAccumulator. > > Also an API to check if there are any messages left for a particular > topic > > in the RecordAccumulator would help. > > > > Thanks, > > > > Mayuresh > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino wrote: > > > > > Great idea. I’ve been talking about this for 2 years, and I’m glad > > someone > > > is finally picking it up. Will take a look at the KIP at some point > > > shortly. > > > > > > -Todd > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote: > > > > > > > Hey Becket, > > > > > > > > Yeah this is really similar to the callback. The difference is really > > in > > > > who sets the behavior. The idea of the interceptor is that it doesn't > > > > require any code change in apps so you can globally add behavior to > > your > > > > Kafka usage without changing app code. Whereas the callback is added > by > > > the > > > > app. The idea is to kind of obviate the need for the wrapper code > that > > > e.g. > > > > LinkedIn maintains to hold this kind of stuff. > > > > > > > > -Jay > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin > > > wrote: > > > > > > > > > This could be a useful feature. And I think there are some use >
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
This could be a useful feature. And I think there are some use cases to mutate the data like rejected alternative one mentioned. I am wondering if there is functional overlapping between ProducerInterceptor.onAcknowledgement() and the producer callback? I can see that the Callback could be a per record setting while onAcknowledgement() is a producer level setting. Other than that, is there any difference between them? Thanks, Jiangjie (Becket) Qin On Fri, Jan 22, 2016 at 6:21 PM, Neha Narkhedewrote: > James, > > That is one of the many monitoring use cases for the interceptor interface. > > Thanks, > Neha > > On Fri, Jan 22, 2016 at 6:18 PM, James Cheng wrote: > > > Anna, > > > > I'm trying to understand a concrete use case. It sounds like producer > > interceptors could be used to implement part of LinkedIn's Kafak Audit > > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > > > Part of that is done by a wrapper library around the kafka producer that > > keeps a count of the number of messages produced, and then sends that > count > > to a side-topic. It sounds like the producer interceptors could possibly > be > > used to implement that? > > > > -James > > > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner wrote: > > > > > > Hi, > > > > > > I just created a KIP-42 for adding producer and consumer interceptors > for > > > intercepting messages at different points on producer and consumer. > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > > Comments and suggestions are welcome! > > > > > > Thanks, > > > Anna > > > > > > > > > > This email and any attachments may contain confidential and privileged > > material for the sole use of the intended recipient. Any review, copying, > > or distribution of this email (or any attachments) by others is > prohibited. > > If you are not the intended recipient, please contact the sender > > immediately and permanently delete this email and any attachments. No > > employee or agent of TiVo Inc. is authorized to conclude any binding > > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > > Inc. may only be made by a signed written agreement. > > > > > > -- > Thanks, > Neha >
[DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Hi, I just created a KIP-42 for adding producer and consumer interceptors for intercepting messages at different points on producer and consumer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors Comments and suggestions are welcome! Thanks, Anna
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
Anna, I'm trying to understand a concrete use case. It sounds like producer interceptors could be used to implement part of LinkedIn's Kafak Audit tool? https://engineering.linkedin.com/kafka/running-kafka-scale Part of that is done by a wrapper library around the kafka producer that keeps a count of the number of messages produced, and then sends that count to a side-topic. It sounds like the producer interceptors could possibly be used to implement that? -James > On Jan 22, 2016, at 4:33 PM, Anna Povznerwrote: > > Hi, > > I just created a KIP-42 for adding producer and consumer interceptors for > intercepting messages at different points on producer and consumer. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > Comments and suggestions are welcome! > > Thanks, > Anna This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
Re: [DISCUSS] KIP-42: Add Producer and Consumer Interceptors
James, That is one of the many monitoring use cases for the interceptor interface. Thanks, Neha On Fri, Jan 22, 2016 at 6:18 PM, James Chengwrote: > Anna, > > I'm trying to understand a concrete use case. It sounds like producer > interceptors could be used to implement part of LinkedIn's Kafak Audit > tool? https://engineering.linkedin.com/kafka/running-kafka-scale > > Part of that is done by a wrapper library around the kafka producer that > keeps a count of the number of messages produced, and then sends that count > to a side-topic. It sounds like the producer interceptors could possibly be > used to implement that? > > -James > > > On Jan 22, 2016, at 4:33 PM, Anna Povzner wrote: > > > > Hi, > > > > I just created a KIP-42 for adding producer and consumer interceptors for > > intercepting messages at different points on producer and consumer. > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > Comments and suggestions are welcome! > > > > Thanks, > > Anna > > > > > This email and any attachments may contain confidential and privileged > material for the sole use of the intended recipient. Any review, copying, > or distribution of this email (or any attachments) by others is prohibited. > If you are not the intended recipient, please contact the sender > immediately and permanently delete this email and any attachments. No > employee or agent of TiVo Inc. is authorized to conclude any binding > agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo > Inc. may only be made by a signed written agreement. > -- Thanks, Neha