One use case we would like is to require that producers are sending compressed 
messages. Would this KIP (or KIP-686) allow the broker to detect that? From 
looking at both KIPs, it doesn't look it would help with my particular use 
case. Both of the KIPs are at the Record-level.

Thanks,
-James

> On Jun 30, 2021, at 10:05 AM, Soumyajit Sahu <soumyajit.s...@gmail.com> wrote:
> 
> Hi Nikolay,
> Great to hear that. I'm ok with either one too.
> I had missed noticing the KIP-686. Thanks for bringing it up.
> 
> I have tried to keep this one simple, but hope it can cover all our
> enterprise needs.
> 
> Should we put this one for vote?
> 
> Regards,
> Soumyajit
> 
> 
> On Wed, Jun 30, 2021, 8:50 AM Nikolay Izhikov <nizhi...@apache.org> wrote:
> 
>> Team, If we have support from committers for API to check records on the
>> broker side let’s choose one KIP to go with and move forward to vote and
>> implementation?
>> I’m ready to drive implementation of this API.
>> 
>> I’m ready to drive the implementation of this API.
>> It seems very useful to me.
>> 
>>> 30 июня 2021 г., в 18:04, Nikolay Izhikov <nizhikov....@gmail.com>
>> написал(а):
>>> 
>>> Hello.
>>> 
>>> I had a very similar proposal [1].
>>> So, yes, I think we should have one implementation of API in the product.
>>> 
>>> [1]
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>> 
>>>> 30 июня 2021 г., в 17:57, Christopher Shannon <
>> christopher.l.shan...@gmail.com> написал(а):
>>>> 
>>>> I would find this feature very useful as well as adding custom
>> validation
>>>> to incoming records would be nice to prevent bad data from making it to
>> the
>>>> topic.
>>>> 
>>>> On Wed, Apr 7, 2021 at 7:03 PM Soumyajit Sahu <soumyajit.s...@gmail.com
>>> 
>>>> wrote:
>>>> 
>>>>> Thanks Colin! Good call on the ApiRecordError. We could use
>>>>> InvalidRecordException instead, and have the broker convert it
>>>>> to ApiRecordError.
>>>>> Modified signature below.
>>>>> 
>>>>> interface BrokerRecordValidator {
>>>>> /**
>>>>>  * Validate the record for a given topic-partition.
>>>>>  */
>>>>>  Optional<InvalidRecordException> validateRecord(TopicPartition
>>>>> topicPartition, ByteBuffer key, ByteBuffer value, Header[] headers);
>>>>> }
>>>>> 
>>>>> On Tue, Apr 6, 2021 at 5:09 PM Colin McCabe <cmcc...@apache.org>
>> wrote:
>>>>> 
>>>>>> Hi Soumyajit,
>>>>>> 
>>>>>> The difficult thing is deciding which fields to share and how to share
>>>>>> them.  Key and value are probably the minimum we need to make this
>>>>> useful.
>>>>>> If we do choose to go with byte buffer, it is not necessary to also
>> pass
>>>>>> the size, since ByteBuffer maintains that internally.
>>>>>> 
>>>>>> ApiRecordError is also an internal class, so it can't be used in a
>> public
>>>>>> API.  I think most likely if we were going to do this, we would just
>>>>> catch
>>>>>> an exception and use the exception text as the validation error.
>>>>>> 
>>>>>> best,
>>>>>> Colin
>>>>>> 
>>>>>> 
>>>>>> On Tue, Apr 6, 2021, at 15:57, Soumyajit Sahu wrote:
>>>>>>> Hi Tom,
>>>>>>> 
>>>>>>> Makes sense. Thanks for the explanation. I get what Colin had meant
>>>>>> earlier.
>>>>>>> 
>>>>>>> Would a different signature for the interface work? Example below,
>> but
>>>>>>> please feel free to suggest alternatives if there are any
>> possibilities
>>>>>> of
>>>>>>> such.
>>>>>>> 
>>>>>>> If needed, then deprecating this and introducing a new signature
>> would
>>>>> be
>>>>>>> straight-forward as both (old and new) calls could be made serially
>> in
>>>>>> the
>>>>>>> LogValidator allowing a coexistence for a transition period.
>>>>>>> 
>>>>>>> interface BrokerRecordValidator {
>>>>>>>  /**
>>>>>>>   * Validate the record for a given topic-partition.
>>>>>>>   */
>>>>>>>  Optional<ApiRecordError> validateRecord(TopicPartition
>>>>>> topicPartition,
>>>>>>> int keySize, ByteBuffer key, int valueSize, ByteBuffer value,
>> Header[]
>>>>>>> headers);
>>>>>>> }
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Apr 6, 2021 at 12:54 AM Tom Bentley <tbent...@redhat.com>
>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi Soumyajit,
>>>>>>>> 
>>>>>>>> Although that class does indeed have public access at the Java
>> level,
>>>>>> it
>>>>>>>> does so only because it needs to be used by internal Kafka code
>> which
>>>>>> lives
>>>>>>>> in other packages (there isn't any more restrictive access modifier
>>>>>> which
>>>>>>>> would work). What the project considers public Java API is
>> determined
>>>>>> by
>>>>>>>> what's included in the published Javadocs:
>>>>>>>> https://kafka.apache.org/27/javadoc/index.html, which doesn't
>>>>> include
>>>>>> the
>>>>>>>> org.apache.kafka.common.record package.
>>>>>>>> 
>>>>>>>> One of the problems with making these internal classes public is it
>>>>>> ties
>>>>>>>> the project into supporting them as APIs, which can make changing
>>>>> them
>>>>>> much
>>>>>>>> harder and in the long run that can slow, or even prevent,
>> innovation
>>>>>> in
>>>>>>>> the rest of Kafka.
>>>>>>>> 
>>>>>>>> Kind regards,
>>>>>>>> 
>>>>>>>> Tom
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sun, Apr 4, 2021 at 7:31 PM Soumyajit Sahu <
>>>>>> soumyajit.s...@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Colin,
>>>>>>>>> I see that both the interface "Record" and the implementation
>>>>>>>>> "DefaultRecord" being used in LogValidator.java are public
>>>>>>>>> interfaces/classes.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/Records.java
>>>>>>>>> and
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
>>>>>>>>> 
>>>>>>>>> So, it should be ok to use them. Let me know what you think.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Soumyajit
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Fri, Apr 2, 2021 at 8:51 AM Colin McCabe <cmcc...@apache.org>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Soumyajit,
>>>>>>>>>> 
>>>>>>>>>> I believe we've had discussions about proposals similar to this
>>>>>> before,
>>>>>>>>>> although I'm having trouble finding one right now.  The issue
>>>>> here
>>>>>> is
>>>>>>>>> that
>>>>>>>>>> Record is a private class -- it is not part of any public API,
>>>>> and
>>>>>> may
>>>>>>>>>> change at any time.  So we can't expose it in public APIs.
>>>>>>>>>> 
>>>>>>>>>> best,
>>>>>>>>>> Colin
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Thu, Apr 1, 2021, at 14:18, Soumyajit Sahu wrote:
>>>>>>>>>>> Hello All,
>>>>>>>>>>> I would like to start a discussion on the KIP-729.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-729%3A+Custom+validation+of+records+on+the+broker+prior+to+log+append
>>>>>>>>>>> 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Soumyajit
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>> 
>> 
>> 

Reply via email to