Hi , I have an issue in kafka-streams while constructing kafka-streams state store windows(TimeWindow and SessionWindow). While kafka-streams processing data sometimes intermittent kafka-streams process throwing below error ThreadName: kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9 TraceID: unknown CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164 Message: stream-client [ kafka-streams-exec-0-test-store -6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams uncaught exception handler org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-AGGREGATE-0000000001 at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115) at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986) at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271) at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716) at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) Caused by: java.lang.NullPointerException at org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.<init>(TimestampedTupleForwarder.java:46) at org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138) at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107) ... 7 more Here my understanding is state-store is null and at that time stateStore.flush() gets invoked to send the data to stateStore, this leads to the above error. This error can be caught inside kafka-streams setUncaughtExceptionHandler. streams.setUncaughtExceptionHandler(throwable -> { LOGGER.error("Exception in streams", throwable); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; }); I'm uncertain about the exact reason for this issue. Everything seems to be in order, including the Kafka cluster, and there are no errors in the Kafka Streams except for a few logs indicating node disconnections. Is there a better way to handle this error? When can this issue happen ? I would like to express my gratitude in advance for any assistance provided. --
Thank's&Regard's, Prasad, 91-9030546248.