[
https://issues.apache.org/jira/browse/KAFKA-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017528#comment-18017528
]
Jainil Rana commented on KAFKA-19638:
-------------------------------------
Hi @mjsax, Iād like to work on this issue and submit a patch, can you please
assign it to me?
> NPE in `Processor#init()` accessing state store
> -----------------------------------------------
>
> Key: KAFKA-19638
> URL: https://issues.apache.org/jira/browse/KAFKA-19638
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 4.2.0
> Reporter: Matthias J. Sax
> Priority: Blocker
> Fix For: 4.2.0
>
>
> As reported on the dev mailing list, we introduced a regression bug via
> https://issues.apache.org/jira/browse/KAFKA-13722 in 4.1 branch. We did
> revert the commit
> ([https://github.com/apache/kafka/commit/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d])
> for 4.1 release, and want to fix-forward for 4.2 release.
> Stacktrace:
> {code:java}
> 15:29:05 ERROR [STREAMS] KafkaStreams - stream-client [app1] Encountered the
> following exception during processing and the registered exception handler
> opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
> org.apache.kafka.streams.errors.StreamsException: failed to initialize
> processor random-value-processor
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:132)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:141)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1109)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:297)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:955)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1417)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1219)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:934)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:894)
> [kafka-streams-4.2.0-SNAPSHOT.jar:?]
> Caused by: java.lang.NullPointerException: Cannot invoke
> "org.apache.kafka.streams.processor.internals.ProcessorRecordContext.timestamp()"
> because the return value of
> "org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()"
> is null
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:303)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:901)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:303)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:123)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> io.littlehorse.simulations.stateful.app.RandomValueProcessor.init(RandomValueProcessor.java:21)
> ~[kafka-streams-stateful-unspecified.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:124)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> ... 8 more {code}
> Thanks [~eduwerc] for reporting the issue.
> We clearly have a testing gap, not trying to use a state store within
> `Processor#init()`. ā We need to close this gap.
> However, there is also the question if using zero as surrogate ts for this
> case (as the old code does), is a good solution or not? ā We could try to use
> stream-time, but for the very first startup of an application, we also do not
> have stream-time established yet, so we kinda push the can down the road.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)