Not sure either, but it sounds like a bug to me. Can you reproduce this
reliably? What version are you using?
It would be best if you could file a Jira ticket and we can take it from
there.
-Matthias
On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:
Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing
below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-0000000001
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
Caused by: java.lang.NullPointerException
at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.<init>(TimestampedTupleForwarder.java:46)
at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this
leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
streams.setUncaughtExceptionHandler(throwable -> {
LOGGER.error("Exception in streams", throwable);
return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
I'm uncertain about the exact reason for this issue. Everything seems to
be
in order, including the Kafka cluster, and there are no errors in the
Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance
provided.