> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matth...@confluent.io> wrote: > > Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly. > > What is unclear to me: what do you mean by "the state store [...] was > errantly scoped to the TransformerProvider, not the Transformer" ? > > I would like to understand the actual issue.
See this gist: https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4 <https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4> Because MyStore is declared in the parent supplier, it will be shared across tasks, even though the .get() function is instantiating a new MyTransformer() for each task. It should have been declared in the MyTransformer sub-class (say, around, line 15). > -Matthias > > On 12/31/18 2:36 AM, Eric Lalonde wrote: >> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My >> application was exhibiting all of the behavior as discussed in the FAQ >> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section: >> >> Why do I get an IllegalStateException when accessing record metadata? >> >> However, the root cause of my problem was not a due to the lack of >> instantiating a new transformer for each task. The root cause of my >> mistake was a bit more insidious: the state store that I instantiated >> was errantly scoped to the TransformerProvider, not the Transformer. Of >> course when I finally realized my error, the problem was obvious, but it >> was not immediately obvious. May I suggest extending the FAQ to help >> others as well? Perhaps it would be helpful to extend the aforementioned >> FAQ section in the following way (highlighted text is my addition): >> >> >> Why do I get an IllegalStateException when accessing record metadata? >> >> If you attach a new |Processor/Transformer/ValueTransformer| to your >> topology using a corresponding supplier, you need to make sure that >> the supplier returns a /new/ instance each time |get()| is called. >> If you return the same object, a >> single |Processor/Transformer/ValueTransformer| would be shared over >> multiple tasks resulting in an |IllegalStateException| with error >> message |"This should not happen as topic() should only be called >> while a record is processed"| (depending on the method you are >> calling it could also be |partition()|, |offset()|, >> or |timestamp()| instead of |topic()|). Additionally, all >> instantiated state stores must be scoped to the inner >> Processor/Transformer/ValueTransformer class, and not to the parent >> Provider class. Scoping state stores to the parent class will result >> in state store re-use across tasks, which will also result >> in IllegalStateExceptions. >> >> >> Hope this saves someone else from making the same mistake :) >> >> - Eric >
smime.p7s
Description: S/MIME Cryptographic Signature