Yevgeny created KAFKA-15463:
-------------------------------

             Summary:  StreamsException: Accessing from an unknown node
                 Key: KAFKA-15463
                 URL: https://issues.apache.org/jira/browse/KAFKA-15463
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.2.1
            Reporter: Yevgeny


After some time application was working fine, starting to get:
 
This is springboot application runs in kubernetes as stateful pod.
 
 
 
{code:java}
  Exception in thread 
"xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
node at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
 at myclass1.java:28) at myclass2.java:48) at 
java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
 at 
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
 at 
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
 at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) 
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
 at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
 at 
java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
 at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
at 
java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
 at myclass3.java:48) at 
org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
 at 
org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
 at 
org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
 at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
 at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
 at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
 at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
 at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
 at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
 at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
 at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
 at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
   {code}
 
stream-thread 
[xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
transition from PENDING_SHUTDOWN to DEAD
 
 
Transformer is Prototype bean, the supplier supplys new instance of the 
Transformer:
 
 
{code:java}
@Override public Transformer<String, MyItem, KeyValue<String, MyItem>> get() {  
   return ctx.getBean(MyTransformer.class); }{code}
 
 
The only way to recover is to delete all topics used by kafkastreams, even if 
application restarted same exception is thrown.

*If messages in internal topics of 'store-changelog'  are deleted/offset 
manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to