One use case we would like is to require that producers are sending compressed messages. Would this KIP (or KIP-686) allow the broker to detect that? From looking at both KIPs, it doesn't look it would help with my particular use case. Both of the KIPs are at the Record-level.
Thanks, -James > On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.s...@gmail.com> wrote: > > Hi Nikolay, > Great to hear that. I'm ok with either one too. > I had missed noticing the KIP-686. Thanks for bringing it up. > > I have tried to keep this one simple, but hope it can cover all our > enterprise needs. > > Should we put this one for vote? > > Regards, > Soumyajit > > > On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org> wrote: > >> Team, If we have support from committers for API to check records on the >> broker side let’s choose one KIP to go with and move forward to vote and >> implementation? >> I’m ready to drive implementation of this API. >> >> I’m ready to drive the implementation of this API. >> It seems very useful to me. >> >>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com> >> написал(а): >>> >>> Hello. >>> >>> I had a very similar proposal [1]. >>> So, yes, I think we should have one implementation of API in the product. >>> >>> [1] >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker >>> >>>> 30 июня 2021 г., в 17:57, Christopher Shannon < >> christopher.l.shan...@gmail.com> написал(а): >>>> >>>> I would find this feature very useful as well as adding custom >> validation >>>> to incoming records would be nice to prevent bad data from making it to >> the >>>> topic. >>>> >>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <soumyajit.s...@gmail.com >>> >>>> wrote: >>>> >>>>> Thanks Colin! Good call on the ApiRecordError. We could use >>>>> InvalidRecordException instead, and have the broker convert it >>>>> to ApiRecordError. >>>>> Modified signature below. >>>>> >>>>> interface BrokerRecordValidator { >>>>> /** >>>>> * Validate the record for a given topic-partition. >>>>> */ >>>>> Optional<InvalidRecordException> validateRecord(TopicPartition >>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers); >>>>> } >>>>> >>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org> >> wrote: >>>>> >>>>>> Hi Soumyajit, >>>>>> >>>>>> The difficult thing is deciding which fields to share and how to share >>>>>> them. Key and value are probably the minimum we need to make this >>>>> useful. >>>>>> If we do choose to go with byte buffer, it is not necessary to also >> pass >>>>>> the size, since ByteBuffer maintains that internally. >>>>>> >>>>>> ApiRecordError is also an internal class, so it can't be used in a >> public >>>>>> API. I think most likely if we were going to do this, we would just >>>>> catch >>>>>> an exception and use the exception text as the validation error. >>>>>> >>>>>> best, >>>>>> Colin >>>>>> >>>>>> >>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote: >>>>>>> Hi Tom, >>>>>>> >>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant >>>>>> earlier. >>>>>>> >>>>>>> Would a different signature for the interface work? Example below, >> but >>>>>>> please feel free to suggest alternatives if there are any >> possibilities >>>>>> of >>>>>>> such. >>>>>>> >>>>>>> If needed, then deprecating this and introducing a new signature >> would >>>>> be >>>>>>> straight-forward as both (old and new) calls could be made serially >> in >>>>>> the >>>>>>> LogValidator allowing a coexistence for a transition period. >>>>>>> >>>>>>> interface BrokerRecordValidator { >>>>>>> /** >>>>>>> * Validate the record for a given topic-partition. >>>>>>> */ >>>>>>> Optional<ApiRecordError> validateRecord(TopicPartition >>>>>> topicPartition, >>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value, >> Header[] >>>>>>> headers); >>>>>>> } >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com> >>>>> wrote: >>>>>>> >>>>>>>> Hi Soumyajit, >>>>>>>> >>>>>>>> Although that class does indeed have public access at the Java >> level, >>>>>> it >>>>>>>> does so only because it needs to be used by internal Kafka code >> which >>>>>> lives >>>>>>>> in other packages (there isn't any more restrictive access modifier >>>>>> which >>>>>>>> would work). What the project considers public Java API is >> determined >>>>>> by >>>>>>>> what's included in the published Javadocs: >>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't >>>>> include >>>>>> the >>>>>>>> org.apache.kafka.common.record package. >>>>>>>> >>>>>>>> One of the problems with making these internal classes public is it >>>>>> ties >>>>>>>> the project into supporting them as APIs, which can make changing >>>>> them >>>>>> much >>>>>>>> harder and in the long run that can slow, or even prevent, >> innovation >>>>>> in >>>>>>>> the rest of Kafka. >>>>>>>> >>>>>>>> Kind regards, >>>>>>>> >>>>>>>> Tom >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu < >>>>>> soumyajit.s...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Colin, >>>>>>>>> I see that both the interface "Record" and the implementation >>>>>>>>> "DefaultRecord" being used in LogValidator.java are public >>>>>>>>> interfaces/classes. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java >>>>>>>>> and >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java >>>>>>>>> >>>>>>>>> So, it should be ok to use them. Let me know what you think. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Soumyajit >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cmcc...@apache.org> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Soumyajit, >>>>>>>>>> >>>>>>>>>> I believe we've had discussions about proposals similar to this >>>>>> before, >>>>>>>>>> although I'm having trouble finding one right now. The issue >>>>> here >>>>>> is >>>>>>>>> that >>>>>>>>>> Record is a private class -- it is not part of any public API, >>>>> and >>>>>> may >>>>>>>>>> change at any time. So we can't expose it in public APIs. >>>>>>>>>> >>>>>>>>>> best, >>>>>>>>>> Colin >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote: >>>>>>>>>>> Hello All, >>>>>>>>>>> I would like to start a discussion on the KIP-729. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append >>>>>>>>>>> >>>>>>>>>>> Thanks! >>>>>>>>>>> Soumyajit >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> >>