Guozhang Wang created KAFKA-10829:
-------------------------------------

             Summary: Kafka Streams handle produce exception improvement
                 Key: KAFKA-10829
                 URL: https://issues.apache.org/jira/browse/KAFKA-10829
             Project: Kafka
          Issue Type: Improvement
          Components: producer , streams
            Reporter: Guozhang Wang


A summary of some recent discussions on how we should improve on embedded 
producer exception handling.

Note that below the basline logic would guarantee that our correctness 
semantics is not violated; and optimization are on top of the baseline to 
reduce the user's burden by letting the library auto-handle certain types of 
exception.

1) ``Producer.send()`` throw exception directly: 

1.a) baseline (to make sure correctness) logic is to always wrap them as 
StreamsException, it would cause the thread to shutdown and exception handler 
triggered. The handler could look into the wrapped exception and decide whether 
the shutdown thread can be restarted.

1.b) optimization is to look at the exception, and decide if they can be 
wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would then 
be auto-handled by lost-all-tasks and re-join.

2) ``Producer.send()`` Callback has an exception:

2.a) baseline is first to check if the exception is instanceof 
RetriableException.

If not retriable, pass it to the producer exception handler to decide whether 
to throw or to continue with record dropped. If decide to throw, always warp it 
as StreamsException and keep it locally; at the same time do not send more 
records from the caller. In the next send call, check the remembered exception 
and throw. It would cause the thread to shutdown and exception handler 
triggered.

If the exception is not Retriable, always throw it as a fatal StreamsException.

2.b) optimization one: if the non-retriable exception can be translated as a 
TaskMigratedException, then do not wrap it as StreamsException to let the 
library handle internally.

2.c) optimization two: if the retriable exception is a timeout exception, then 
do not pass to the produce exception handler and treat it as TaskMigrated.

3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly:

3.a) baseline logic is to capture all KafkaException except TimeoutException, 
and handle them as *TaskCorrupted* (which include abort the transaction, reset 
the state, and re-join the group). TimeoutException would be rethrown.

3.b) optimization: some exceptions can be handled as TaskMigrated, which would 
be handled in a lighter way.

4) ``Producer.abortTxn`` throw exception:

3.a) baseline logic is to capture all KafkaException  except TimeoutException 
as fatal StreamsException. TimeoutException would be rethrown.

3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition 
means the abort did not succeeded).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to