Hello, Paul.
Thanks for the feedback!
> How does the producer get notified of a failure to pass the RecordPolicy for
> one or more records,
The producer will receive `PolicyViolationException`.
> how should it recover?
Obvious answers are
Producer should switch to the correct schema OR producer should be stopped
abnormally.
> Assuming a RecordPolicy can be loaded by a broker without restarting it, what
> is the mechanism by which this happens?
Thanks for the good question:
Think we should choose from one of the following alternatives:
1. We allow the users to use any `RecordsPolicy` implementation.
In this case, Kafka administrator is responsible for putting a
custom jar with the `RecordsPolicy` implementation to every Kafka Broker
classpath(libs directory).
AFAIK this selected as a base scenario for an `Authorizer`
implementation.
2. We allow the users to select implementation from some predefined
list that Kafka developers included in some release.
In this case, every Kafka broker will have a specific
implementation from the Kafka release itself.
We can go with this because wrong `RecordsPolicy`
implementation can affect broker stability and performance.
I, personally, prefer first choice.
> Must writes to replicas also adhere to the RecordPolicy?
I think we should check only on the leader.
> Must already-written written records adhere to RecordPolicy, if it is added
> later?
No.
> managing schema outside of kafka itself using something like the confluent
> schema registry.
> Maybe you can say why RecordPolicy would be better?
1. Can't agree that a commercial product is an alternative to the proposed
open-source API.
Moreover, I propose to add an API that has a little overlap with such a big
product as a Schema Registry as a whole.
2. AFAIU Confluent Schema Registry should use a similar technique to ensure
records schema in the topic.
My understanding based on Schema Registry docs [1]. Specifically:
- Confluent Schema Registry has custom topic configuration options to
enable or disable schema checks.
- "With this configuration, if a message is produced to the topic
my-topic-sv that does not have a valid schema for the value of the message, an
error is returned to the producer, and the message is discarded."
[1]
https://docs.confluent.io/platform/current/schema-registry/schema-validation.html
> 1 дек. 2020 г., в 06:15, Paul Whalen <[email protected]> написал(а):
>
> Nikolay,
>
> I'm not a committer, but perhaps I can start the discussion. I've had the
> urge for a similar feature after being bitten by writing a poorly formed
> record to a topic - it's natural to want to push schema validation into the
> broker, since that's the way regular databases work. But I'm a bit
> skeptical of the complexity it introduces. Some questions I think would
> have to be answered that aren't currently in the KIP:
> - How does the producer get notified of a failure to pass the RecordPolicy
> for one or more records, and how should it recover?
> - Assuming a RecordPolicy can be loaded by a broker without restarting it,
> what is the mechanism by which this happens?
> - Must writes to replicas also adhere to the RecordPolicy?
> - Must already-written written records adhere to RecordPolicy, if it is
> added later?
>
> Also, the rejected alternatives section is blank - I see the status quo as
> at least one alternative, in particular, managing schema outside of kafka
> itself using something like the confluent schema registry. Maybe you can
> say why RecordPolicy would be better?
>
> Best,
> Paul
>
> On Mon, Nov 30, 2020 at 9:58 AM Nikolay Izhikov <[email protected]> wrote:
>
>> Friendly bump.
>>
>> Please, share your feedback.
>> Do we need those feature in the Kafka?
>>
>>> 23 нояб. 2020 г., в 12:09, Nikolay Izhikov <[email protected]>
>> написал(а):
>>>
>>> Hello!
>>>
>>> Any additional feedback on this KIP?
>>> I believe this API can be useful for Kafka users.
>>>
>>>
>>>> 18 нояб. 2020 г., в 14:47, Nikolay Izhikov <[email protected]>
>> написал(а):
>>>>
>>>> Hello, Ismael.
>>>>
>>>> Thanks for the feedback.
>>>> You are right, I read public interfaces definition not carefully :)
>>>>
>>>> Updated KIP according to your objection.
>>>> I propose to expose 2 new public interfaces:
>>>>
>>>> ```
>>>> package org.apache.kafka.common;
>>>>
>>>> public interface Record {
>>>> long timestamp();
>>>>
>>>> boolean hasKey();
>>>>
>>>> ByteBuffer key();
>>>>
>>>> boolean hasValue();
>>>>
>>>> ByteBuffer value();
>>>>
>>>> Header[] headers();
>>>> }
>>>>
>>>> package org.apache.kafka.server.policy;
>>>>
>>>> public interface RecordsPolicy extends Configurable, AutoCloseable {
>>>> void validate(String topic, int partition, Iterable<? extends Record>
>> records) throws PolicyViolationException;
>>>> }
>>>> ```
>>>>
>>>> Data exposed in Record and in validate method itself seems to enough
>> for implementation of any reasonable Policy.
>>>>
>>>>> 17 нояб. 2020 г., в 19:44, Ismael Juma <[email protected]> написал(а):
>>>>>
>>>>> Thanks for the KIP. The policy interface is a small part of this. You
>> also
>>>>> have to describe the new public API that will be exposed as part of
>> this.
>>>>> For example, there is no public `Records` class.
>>>>>
>>>>> Ismael
>>>>>
>>>>> On Tue, Nov 17, 2020 at 8:24 AM Nikolay Izhikov <[email protected]>
>> wrote:
>>>>>
>>>>>> Hello.
>>>>>>
>>>>>> I want to start discussion of the KIP-686 [1].
>>>>>> I propose to introduce the new public interface for it RecordsPolicy:
>>>>>>
>>>>>> ```
>>>>>> public interface RecordsPolicy extends Configurable, AutoCloseable {
>>>>>> void validate(String topic, Records records) throws
>>>>>> PolicyViolationException;
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> and a two new configuration options:
>>>>>> * `records.policy.class.name: String` - sets class name of the
>>>>>> implementation of RecordsPolicy for the specific topic.
>>>>>> * `records.policy.enabled: Boolean` - enable or disable records
>> policy
>>>>>> for the topic.
>>>>>>
>>>>>> If `records.policy.enabled=true` then an instance of the
>> `RecordsPolicy`
>>>>>> should check each Records batch before applying data to the log.
>>>>>> If `PolicyViolationException` thrown from the
>> `RecordsPolicy#validate`
>>>>>> method then no data added to the log and the client receives an error.
>>>>>>
>>>>>> Motivation:
>>>>>>
>>>>>> During the adoption of Kafka in large enterprises, it's important to
>>>>>> guarantee data in some topic conforms to the specific format.
>>>>>> When data are written and read by the different applications
>> developed by
>>>>>> the different teams it's hard to guarantee data format using only
>> custom
>>>>>> SerDe, because malicious applications can use different SerDe.
>>>>>> The data format can be enforced only on the broker side.
>>>>>>
>>>>>> Please, share your feedback.
>>>>>>
>>>>>> [1]
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-686%3A+API+to+ensure+Records+policy+on+the+broker
>>>>
>>>
>>
>>