Hi Tom,

thanks for tour comments, replies inline below.

On Thu, 22 Jun 2023 at 10:58, Tom Bentley <tbent...@redhat.com> wrote:
>
> Hi Edorado and Adrian,
>
> Thanks for the KIP.
>
> I think it would be good to elaborate on exactly how validate() gets
> called, because I think there are a number of potential problems, or at
> least things to consider.
>
> From the broker's point of view, validate() can do arbitrary things. It
> might never return due to blocking or an infinite loop. It might cause an
> OOME, or throw a StackOverflowException. These are not entirely unlikely
> and the risks cannot always be easily avoided by a careful policy
> implementation. For example, a plugin which performs schema validation
> would typically be fetching schema information from a remote registry (and
> caching it for future use), and so could block on the networking (there are
> further questions about retry in the case of network error). Then, when
> deserializing a format like JSON deserializers might be prone to SOE or
> OOME (e.g. consider a naive recursive JSON parser with JSON input starting
> "[[[[[[[[[[[[[[[[[[[[[[[[[[[[..."). More generally, incorrect
> deserialization of untrusted input is a common kind of CVE. Similarly
> validation might involve regular expression matching (for example
> JSONSchema supports pattern constraints). The matcher implementation really
> matters and common matchers, including Java's Pattern, expose you to the
> possibility of nasty exponential time behaviour.

We agree with your observations, running 3rd party code inside the
broker exposes it to these problems.
The Authorizer for example, although it’s not typically involved with
user input deserialization and it is not invoked in a lock,
is an example of existing plugin code invoked from the IO threads and
implementations might access external systems.
Server side input validation carries a tradeoff between functionality
and risk, if it is not acceptable in a certain deployment then it
should not be enabled.

An implementation could use an own thread-pool and have the call
coming from the IO thread bounded by a timeout.
We do not think such a solution should be mandated as part of the
plugin interface.
We envision that the record validation plugin implementations used in
a production system to be production quality code,
likely developed and tested by the schema registry provider as are the serdes.
In fact there is a natural semantic coupling between the serdes and
the validator.
We do not expect Kafka cluster administrators to just run any code
within their brokers.

Furthermore, not all validation requires parsing of the message
payload to provide value.
For example, a policy that checks records carry a valid schema ID
would prevent common misconfigurations - like running a client without
a registry’s serdes.

> You mentioned LogValidator in the KIP. This executes on an IO thread and
> gets called with the log lock held. So the consequences of the validation
> blocking could actually be a real problem from a broker availability PoV if
> this validation happens in the same place. In the worst case all the IO
> threads get stuck because of bad input (perhaps from a single producer), or
> network problems between the broker and the registry. I don't think simply
> moving the validation to before the acquisition of the lock is an easy
> solution either, because of the dependency on the compression validation.

The existing LogValidator seems a very natural point to perform an
optional deeper validation than the existing one,
Again an implementation that uses a timeout-bounded call seems a possibility.

Thanks to your observation we think some metrics should be introduced
to monitor the plugin behaviour.
We could enhance the KIP introducing metrics similar to the existing
ones related to message conversions and invalid messages, e.g.

kafka.network:type=RequestMetrics,name=MessageValidationTimeMs

kafka.server:type=BrokerTopicMetrics,name=ProduceMessageValidationsPerSec
kafka.server:type=BrokerTopicMetrics,name=ProduceMessageValidationsPerSec,topic=<topic>

kafka.server:type=BrokerTopicMetrics,name=InvalidMessageRecordsPerSec
kafka.server:type=BrokerTopicMetrics,name=InvalidMessageRecordsPerSec,topic=<topic>

What do you think?

Thanks,
Edo & Adrian

Reply via email to