Hi Matthias, 

To sum up with the ProductionExceptionHandler callback methods (106) proposed 
changes.

A new method ProductionExceptionHandler#handle is added with the following 
signature:

> ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
> final ProducerRecord<?,?> record, final Exception exception);

The ProducerRecord<?,?> parameter has changed to accept a serialized or 
non-serialized record. 
Thus, the new ProductionExceptionHandler#handle method can handle both 
production exception and serialization exception.

Both old ProductionExceptionHandler#handle and 
ProductionExceptionHandler#handleSerializationException methods are now 
deprecated.
The old ProductionExceptionHandler#handle method gets a default implementation, 
so users do not have to implement a deprecated method.

To handle backward compatibility, the new ProductionExceptionHandler#handle 
method gets a default implementation.

> default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
> context, final ProducerRecord<?, ?> record, final Exception exception) {
>   if (exception instanceof RecordSerializationException) {
>       this.handleSerializationException(record, exception.getCause());
>   }
>
>   return handle((ProducerRecord<byte[], byte[]>) record, exception);
> }

The default implementation either invokes #handleSerializationException or 
#handle depending on the type of the exception, thus users still relying on 
deprecated ProductionExceptionHandler#handle 
or ProductionExceptionHandler#handleSerializationException custom 
implementations won't break.

The new ProductionExceptionHandler#handle method is now invoked in case of 
serialization exception:

> public <K, V> void send(final String topic, final K key, final V value, ...) {
>     try {
>         keyBytes = keySerializer.serialize(topic, headers, key);
>         ...   
>     } catch (final ClassCastException exception) {
>       ...
>     } catch (final Exception exception) {
>     
>         try {
>             response = productionExceptionHandler.handle(context, record, new 
> RecordSerializationException(SerializationExceptionOrigin.KEY, exception));
>         } catch (final Exception e) {
>         ...
>         }
>     }
> }

To wrap the origin serialization exception and determine whether it comes from 
the key or the value, a new exception class is created:

> public class RecordSerializationException extends SerializationException {
>     public enum SerializationExceptionOrigin {
>         KEY,
>         VALUE
>     }
>
>     public RecordSerializationException(SerializationExceptionOrigin origin, 
> final Throwable cause);
> }

Hope it all makes sense,
Loïc

Reply via email to