[ https://issues.apache.org/jira/browse/KAFKA-9566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17259416#comment-17259416 ]
John Roesler commented on KAFKA-9566: ------------------------------------- Hi all, Thanks, [~mjsax] , that's a good thought. I just took a look at the relevant interfaces, and unfortunately, I don't think the new Context interface improves the situation much. The DeserializationExceptionHandler already has all the "record context" information from the raw ConsumerRecord it gets, so we must have provided the ProcessorContext to inject other information, like the application and task id, maybe the default serdes or metrics, etc. Unfortunately, the new interfaces don't have a nice "slice" of these informational methods that we could pass to the DeserializationExceptionHandler without also exposing forward(). It seems like a JavaDoc improvement is the best way forward for this right now, unless we want to want to propose a completely new DeserializationExceptionHandler that just provides the pieces of information we think it needs. > ProcessorContextImpl#forward throws NullPointerException if invoked from > DeserializationExceptionHandler > -------------------------------------------------------------------------------------------------------- > > Key: KAFKA-9566 > URL: https://issues.apache.org/jira/browse/KAFKA-9566 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0 > Reporter: Tomas Mi > Priority: Minor > > Hi, I am trying to implement custom DeserializationExceptionHandler which > would forward an exception to downstream processor(s), but > ProcessorContextImpl#forward throws a NullPointerException if invoked from > this custom handler. > Handler implementation: > {code:title=MyDeserializationExceptionHandler.java} > public class MyDeserializationExceptionHandler implements > DeserializationExceptionHandler { > @Override > public void configure(Map<String, ?> configs) { > } > @Override > public DeserializationHandlerResponse handle(ProcessorContext context, > ConsumerRecord<byte[], byte[]> record, Exception exception) { > context.forward(null, exception, To.child("error-processor")); > return DeserializationHandlerResponse.CONTINUE; > } > } > {code} > Handler is wired as default deserialization exception handler: > {code} > private TopologyTestDriver initializeTestDriver(StreamsBuilder > streamBuilder) { > Topology topology = streamBuilder.build(); > Properties props = new Properties(); > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, > "my-test-application"); > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > "dummy:1234"); > props.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE); > > props.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > MyDeserializationExceptionHandler.class.getName()); > return new TopologyTestDriver(topology, props); > } > {code} >  > Exception stacktrace: > {noformat} > org.apache.kafka.streams.errors.StreamsException: Fatal user code error in > deserialization error callback > at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:76) > at > org.apache.kafka.streams.processor.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160) > at > org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:101) > at > org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:136) > at > org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:742) > at > org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:392) > ... > Caused by: java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:165) > at > MyDeserializationExceptionHandler.handle(NewExceptionHandlerTest.java:204) > at > org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:70) > ... 33 more > {noformat} > Neither DeserializationExceptionHandler, nor ProcessorContext javadocs > mention that ProcessorContext#forward(...) must not be invoked from > DeserializationExceptionHandler, so I assume that this is a defect. -- This message was sent by Atlassian Jira (v8.3.4#803005)