Hi Andrew,

what version of Kafka Streams do you use?

Since 2.7 there is a null check for the source node [1].

The following ticket might be related:
https://issues.apache.org/jira/browse/KAFKA-10205


Best,
Bruno


[1] https://github.com/apache/kafka/blob/20028e24cca91422b8f02fdbf45d2cd9ef24c901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L1285



On 06.06.23 01:55, An, Hongguo (CORP) wrote:
Hi:
Every time, I restart my kafka stream app, it failed, and I have to reset the 
app, the error is:

org.apache.kafka.streams.errors.StreamsException: stream-thread 
[microapi-unified-profile-data-sync.dit-1067669c-2364-440d-a1c3-69ad45cc301d-StreamThread-4]
 Failed to rebalance.
                 at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:972)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.NullPointerException: Cannot invoke 
"org.apache.kafka.streams.processor.internals.SourceNode.getTimestampExtractor()" because 
"source" is null
                 at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:230)
                 at 
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:459)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:410)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:395)
                 at 
org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148)
                 at 
org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:294)
                 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:285)
                 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
                 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
                 at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
                 at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
                 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
                 at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
                 at 
brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:84)
                 at 
brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:78)
                 at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)

It works after reset, please help.

Thanks
Andrew


This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.

Reply via email to