Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
 Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-0000000001
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
  at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
  at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
  at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
  at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
  at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
  at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
  at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
 Caused by: java.lang.NullPointerException
  at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.<init>(TimestampedTupleForwarder.java:46)
  at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
  at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
  ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
  streams.setUncaughtExceptionHandler(throwable -> {
          LOGGER.error("Exception in streams", throwable);
          return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
  });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.
-- 


Thank's&Regard's,
Prasad,
91-9030546248.

Reply via email to