[ 
https://issues.apache.org/jira/browse/KAFKA-15463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17779715#comment-17779715
 ] 

A. Sophie Blee-Goldman commented on KAFKA-15463:
------------------------------------------------

[~yevsh]  Well the intended pattern is to get the StateStore from the context 
during init and then save a reference to the store for your 
processor/transformer. That's why we pass the context in to #init but not to 
#process (also, while it's not a significant overhead, it's definitely more 
efficient to do the state store lookup only once, during init, and not on every 
single record that's processed)

That said, I think it's a fair question as to why this is causing an error, and 
why the error doesn't happen every time. The fact that it generally only 
appears when you have more than one partition/task per application instance 
makes me think there is some state that is somehow being shared between the 
different tasks. This would definitely be explained by returning the same 
transformer instance each time, but based on your latest update, you are 
definitely returning a new transformer each time and still seeing the issue, 
right?

Assuming so, I'm inclined to believe the issue is with Spring. I vaguely recall 
a similar problem being reported in the past, which was ultimately because a 
Spring object was unexpectedly/unknowingly acting as a static/singleton class. 
This was resulting in the context – of which there is supposed to be exactly 
one per task – being shared between different instances of the task/processor 
nodes.

I'm pretty sure that's what is going on here as well. I'm guessing that the 
MyService class is a Spring bean? If so, then it's effectively a singleton and 
will be shared by each of the individual Transformer instances in your 
application, meaning the different tasks will be overwriting each others 
context when invoking #init on this transformer. So the context you retrieve 
from the myServiceBean during #process may not be the same as the context you 
saved to it in #init, causing it to throw this error since only the context 
corresponding to the task that is currently being processed will have the 
currentNode set to a non-null value.

Even if you made the change I originally suggested but saved the StateStore 
reference by passing it to the MyService bean, it wouldn't work – it might not 
throw this error but you would potentially be reading and writing to the wrong 
copy of a state store for a given task, which is even worse. The only way to 
solve this is by removing the Spring bean entirely or at least refactoring it 
so that it doesn't hold any internal state and has to have the full application 
state for that task passed in to it every time – in other words you just need 
to make sure to keep all the objects used by a given transformer completely 
local to that instance. Here is my recommendation for how to implement your 
transformer class – hope this helps!


private final MYService myServiceBean;
private StateStore myStore;

@Overridepublic void init(ProcessorContext context) \{
    myStore = context.getStateStore(STORE_NAME);
}

@Overridepublic KeyValue<String, MyItem> transform(String key, MyItem myItem) \{
    myServiceBean.process(myItem, myStore);
}
Basically modify the MyService bean to accept the StateStore to operate on as a 
parameter to its #process method. And definitely keep the fix in which you 
return a new Transformer instance each time instead of reusing the same one.

Let me know if you have any questions! I'm going to close the ticket since I'm 
fairly confident in this solution having seen the same problem before, but 
definitely please do reopen it if you implement the suggested fix and still 
encounter an error. Good luck!

>  StreamsException: Accessing from an unknown node
> -------------------------------------------------
>
>                 Key: KAFKA-15463
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15463
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.2.1
>            Reporter: Yevgeny
>            Priority: Major
>
> After some time application was working fine, starting to get:
>  
> This is springboot application runs in kubernetes as stateful pod.
>  
>  
>  
> {code:java}
>   Exception in thread 
> "xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1" 
> org.apache.kafka.streams.errors.StreamsException: Accessing from an unknown 
> node at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:162)
>  at myclass1.java:28) at myclass2.java:48) at 
> java.base/java.util.stream.MatchOps$1MatchSink.accept(MatchOps.java:90) at 
> java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(ArrayList.java:1602)
>  at 
> java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
>  at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
>  at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:230)
>  at 
> java.base/java.util.stream.MatchOps$MatchOp.evaluateSequential(MatchOps.java:196)
>  at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at 
> java.base/java.util.stream.ReferencePipeline.allMatch(ReferencePipeline.java:637)
>  at myclass3.java:48) at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
>  at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:780)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:711)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
>  at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:589)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551)
>    {code}
>  
> stream-thread 
> [xxxxxxxxxxxx-ddf9819f-d6c7-46ce-930e-cd923e1b3c2c-StreamThread-1] State 
> transition from PENDING_SHUTDOWN to DEAD
>  
>  
> Transformer is Prototype bean, the supplier supplys new instance of the 
> Transformer:
>  
>  
> {code:java}
> @Override public Transformer<String, MyItem, KeyValue<String, MyItem>> get() 
> {     return ctx.getBean(MyTransformer.class); }{code}
>  
>  
> The only way to recover is to delete all topics used by kafkastreams, even if 
> application restarted same exception is thrown.
> *If messages in internal topics of 'store-changelog'  are deleted/offset 
> manipulated, can it cause the issue?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to