[ 
https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17259416#comment-17259416
 ] 

John Roesler commented on KAFKA-9566:
-------------------------------------

Hi all,

Thanks, [~mjsax] , that's a good thought.

I just took a look at the relevant interfaces, and unfortunately, I don't think 
the new Context interface improves the situation much. The 
DeserializationExceptionHandler already has all the "record context" 
information from the raw ConsumerRecord it gets, so we must have provided the 
ProcessorContext to inject other information, like the application and task id, 
maybe the default serdes or metrics, etc.

Unfortunately, the new interfaces don't have a nice "slice" of these 
informational methods that we could pass to the DeserializationExceptionHandler 
without also exposing forward().

It seems like a JavaDoc improvement is the best way forward for this right now, 
unless we want to want to propose a completely new 
DeserializationExceptionHandler that just provides the pieces of information we 
think it needs.

> ProcessorContextImpl#forward throws NullPointerException if invoked from 
> DeserializationExceptionHandler
> --------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9566
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Tomas Mi
>            Priority: Minor
>
> Hi, I am trying to implement custom DeserializationExceptionHandler which 
> would forward an exception to downstream processor(s), but 
> ProcessorContextImpl#forward throws a NullPointerException if invoked from 
> this custom handler.
> Handler implementation:
> {code:title=MyDeserializationExceptionHandler.java}
> public class MyDeserializationExceptionHandler implements 
> DeserializationExceptionHandler {
>     @Override
>     public void configure(Map<String, ?> configs) {
>     }
>     @Override
>     public DeserializationHandlerResponse handle(ProcessorContext context, 
> ConsumerRecord<byte[], byte[]> record, Exception exception) {
>         context.forward(null, exception, To.child("error-processor"));
>         return DeserializationHandlerResponse.CONTINUE;
>     }
> }
> {code}
> Handler is wired as default deserialization exception handler:
> {code}
>     private TopologyTestDriver initializeTestDriver(StreamsBuilder 
> streamBuilder) {
>         Topology topology = streamBuilder.build();
>         Properties props = new Properties();
>         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-test-application");
>         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "dummy:1234");
>         props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE);
>         
> props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  MyDeserializationExceptionHandler.class.getName());
>         return new TopologyTestDriver(topology, props);
>     }
> {code}
>  
> Exception stacktrace:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: Fatal user code error in 
> deserialization error callback
>     at 
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76)
>     at 
> org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)
>     at 
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101)
>     at 
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136)
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742)
>     at 
> org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392)
>     ...
> Caused by: java.lang.NullPointerException
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165)
>     at 
> MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204)
>     at 
> org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70)
>  ... 33 more
> {noformat}
> Neither DeserializationExceptionHandler, nor ProcessorContext javadocs 
> mention that ProcessorContext#forward(...) must not be invoked from 
> DeserializationExceptionHandler, so I assume that this is a defect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to