Thanks for the KIP Alieh! It addresses an important case for error handling.

I agree that using this handler would be an expert API, as mentioned by a few people. But I don't think it would be a reason to not add it. It's always a tricky tradeoff what to expose to users and to avoid foot guns, but we added similar handlers to Kafka Streams, and have good experience with it. Hence, I understand, but don't share the concern raised.

I also agree that there is some responsibility by the user to understand how such a handler should be implemented to not drop data by accident. But it seem unavoidable and acceptable.

While I understand that a "simpler / reduced" API (eg via configs) might also work, I personally prefer a full handler. Configs have the same issue that they could be miss-used potentially leading to incorrectly dropped data, but at the same time are less flexible (and thus maybe ever harder to use correctly...?). Base on my experience, there is also often weird corner case for which it make sense to also drop records for other exceptions, and a full handler has the advantage of full flexibility and "absolute power!".

To be fair: I don't know the exact code paths of the producer in details, so please keep me honest. But my understanding is, that the KIP aims to allow users to react to internal exception, and decide to keep retrying internally, swallow the error and drop the record, or raise the error?

Maybe the KIP would need to be a little bit more precises what error we want to cover -- I don't think this list must be exhaustive, as we can always do follow up KIP to also apply the handler to other errors to expand the scope of the handler. The KIP does mention examples, but it might be good to explicitly state for what cases the handler gets applied?

I am also not sure if CONTINUE and FAIL are enough options? Don't we need three options? Or would `CONTINUE` have different meaning depending on the type of error? Ie, for a retryable error `CONTINUE` would mean keep retrying internally, but for a non-retryable error `CONTINUE` means swallow the error and drop the record? This semantic overload seems tricky to reason about by users, so it might better to split `CONTINUE` into two cases -> `RETRY` and `SWALLOW` (or some better names).

Additionally, should we just ship a `DefaultClientExceptionHandler` which would return `FAIL`, for backward compatibility. Or don't have any default handler to begin with and allow it to be `null`? I don't see the need for a specific `TransactionExceptionHandler`. To me, the goal should be to not modify the default behavior at all, but to just allow users to change the default behavior if there is a need.

What is missing on the KIP though it, how the handler is passed into the producer thought? Would we need a new config which allows to set a custom handler? And/or would we allow to pass in an instance via the constructor or add a new method to set a handler?


-Matthias

On 4/18/24 10:02 AM, Andrew Schofield wrote:
Hi Alieh,
Thanks for the KIP.

Exception handling in the Kafka producer and consumer is really not ideal.
It’s even harder working out what’s going on with the consumer.

I’m a bit nervous about this KIP and I agree with Chris that it could do with 
additional
motivation. This would be an expert-level interface given how complicated
the exception handling for Kafka has become.

7. The application is not really aware of the batching being done on its behalf.
The ProduceResponse can actually return an array of records which failed
per batch. If you get RecordTooLargeException, and want to retry, you probably
need to remove the offending records from the batch and retry it. This is 
getting fiddly.

8. There is already o.a.k.clients.producer.Callback. I wonder whether an
alternative might be to add a method to the existing Callback interface, such 
as:

   ClientExceptionResponse onException(Exception exception)

It would be called when a ProduceResponse contains an error, but the
producer is going to retry. It tells the producer whether to go ahead with the 
retry
or not. The default implementation would be to CONTINUE, because that’s
just continuing to retry as planned. Note that this is a per-record callback, so
the application would be able to understand which records failed.

By using an existing interface, we already know how to configure it and we know
about the threading model for calling it.


Thanks,
Andrew



On 17 Apr 2024, at 18:17, Chris Egerton <chr...@aiven.io.INVALID> wrote:

Hi Alieh,

Thanks for the KIP! The issue with writing to non-existent topics is
particularly frustrating for users of Kafka Connect and has been the source
of a handful of Jira tickets over the years. My thoughts:

1. An additional detail we can add to the motivation (or possibly rejected
alternatives) section is that this kind of custom retry logic can't be
implemented by hand by, e.g., setting retries to 0 in the producer config
and handling exceptions at the application level. Or rather, it can, but 1)
it's a bit painful to have to reimplement at every call-site for
Producer::send (and any code that awaits the returned Future) and 2) it's
impossible to do this without losing idempotency on retries.

2. That said, I wonder if a pluggable interface is really the right call
here. Documenting the interactions of a producer with
a ClientExceptionHandler instance will be tricky, and implementing them
will also be a fair amount of work. I believe that there needs to be some
more granularity for how writes to non-existent topics (or really,
UNKNOWN_TOPIC_OR_PARTITION and related errors from the broker) are handled,
but I'm torn between keeping it simple with maybe one or two new producer
config properties, or a full-blown pluggable interface. If there are more
cases that would benefit from a pluggable interface, it would be nice to
identify these and add them to the KIP to strengthen the motivation. Right
now, I'm not sure the two that are mentioned in the motivation are
sufficient.

3. Alternatively, a possible compromise is for this KIP to introduce new
properties that dictate how to handle unknown-topic-partition and
record-too-large errors, with the thinking that if we introduce a pluggable
interface later on, these properties will be recognized by the default
implementation of that interface but could be completely ignored or
replaced by alternative implementations.

4. (Nit) You can remove the "This page is meant as a template for writing a
KIP..." part from the KIP. It's not a template anymore :)

5. If we do go the pluggable interface route, wouldn't we want to add the
possibility for retry logic? The simplest version of this could be to add a
RETRY value to the ClientExceptionHandlerResponse enum.

6. I think "SKIP" or "DROP" might be clearer instead of "CONTINUE" for
the ClientExceptionHandlerResponse enum, since they cause records to be
dropped.

Cheers,

Chris

On Wed, Apr 17, 2024 at 12:25 PM Justine Olshan
<jols...@confluent.io.invalid> wrote:

Hey Alieh,

I echo what Omnia says, I'm not sure I understand the implications of the
change and I think more detail is needed.

This comment also confused me a bit:
* {@code ClientExceptionHandler} that continues the transaction even if a
record is too large.
* Otherwise, it makes the transaction to fail.

Relatedly, I've been working with some folks on a KIP for transactions
errors and how they are handled. Specifically for the
RecordTooLargeException (and a few other errors), we want to give a new
error category for this error that allows the application to choose how it
is handled. Maybe this KIP is something that you are looking for? Stay
tuned :)

Justine





On Wed, Apr 17, 2024 at 8:03 AM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
wrote:

Hi Alieh,
Thanks for the KIP! I have couple of comments
- You mentioned in the KIP motivation,
Another example for which a production exception handler could be
useful
is if a user tries to write into a non-existing topic, which returns a
retryable error code; with infinite retries, the producer would hang
retrying forever. A handler could help to break the infinite retry loop.

How the handler can differentiate between something that is temporary and
it should keep retrying and something permanent like forgot to create the
topic? temporary here could be
the producer get deployed before the topic creation finish (specially if
the topic creation is handled via IaC)
temporary offline partitions
leadership changing
        Isn’t this putting the producer at risk of dropping records
unintentionally?
- Can you elaborate more on what is written in the compatibility /
migration plan section please by explaining in bit more details what is
the
changing behaviour and how this will impact client who are upgrading?
- In the proposal changes can you elaborate in the KIP where in the
producer lifecycle will ClientExceptionHandler and
TransactionExceptionHandler get triggered, and how will the producer
configure them to point to costumed implementation.

Thanks
Omnia

On 17 Apr 2024, at 13:13, Alieh Saeedi <asae...@confluent.io.INVALID>
wrote:

Hi all,
Here is the KIP-1038: Add Custom Error Handler to Producer.
<

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1038%3A+Add+Custom+Error+Handler+to+Producer

I look forward to your feedback!

Cheers,
Alieh




Reply via email to