Veniamin Kalegin created KAFKA-12396: ----------------------------------------
Summary: Dedicated exception for kstreams when null key received Key: KAFKA-12396 URL: https://issues.apache.org/jira/browse/KAFKA-12396 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.6.0 Reporter: Veniamin Kalegin If kstreams application received null as a key (thanks to QA), kstream app gives long and confusing stack trace, it would be nice to have shorter and specific exception instead of {{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=(hidden), partition=0, offset=3722, stacktrace=java.lang.NullPointerException}} at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286) at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown Source) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133) at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78) at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) at org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown Source) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679) at org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown Source) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) Caused by: java.lang.NullPointerException at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286) at org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown Source) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133) at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78) at HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:103) at HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:29) at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) at org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown Source) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679) at org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown Source) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679) ... 4 more -- This message was sent by Atlassian Jira (v8.3.4#803005)