[ https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17768141#comment-17768141 ]
A. Sophie Blee-Goldman commented on KAFKA-15463: ------------------------------------------------ [~yevsh] try moving the KeyValueStore<String, SomeOtherItem> store = context.getStateStore("storeName"); from the #process method to the #init method of your processor > StreamsException: Accessing from an unknown node > ------------------------------------------------- > > Key: KAFKA-15463 > URL: https://issues.apache.org/jira/browse/KAFKA-15463 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.2.1 > Reporter: Yevgeny > Priority: Major > > After some time application was working fine, starting to get: > > This is springboot application runs in kubernetes as stateful pod. > > > > {code:java} > Exception in thread > "xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" > org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown > node at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162) > at myclass1.java:28) at myclass2.java:48) at > java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at > java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602) > at > java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129) > at > java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230) > at > java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637) > at myclass3.java:48) at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49) > at > org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38) > at > org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) > {code} > > stream-thread > [xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State > transition from PENDING_SHUTDOWN to DEAD > > > Transformer is Prototype bean, the supplier supplys new instance of the > Transformer: > > > {code:java} > @Override public Transformer<String, MyItem, KeyValue<String, MyItem>> get() > { return ctx.getBean(MyTransformer.class); }{code} > > > The only way to recover is to delete all topics used by kafkastreams, even if > application restarted same exception is thrown. > *If messages in internal topics of 'store-changelog' are deleted/offset > manipulated, can it cause the issue? -- This message was sent by Atlassian Jira (v8.20.10#820010)