I see. When updating the FAQ, it should be clear what you mean. Your current proposal was unclear to me, and thus, it might be unclear to other users, too.
-Matthias On 1/3/19 9:13 PM, Eric Lalonde wrote: > > >> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matth...@confluent.io >> <mailto:matth...@confluent.io>> wrote: >> >> Thanks for reporting this. Feel free to edit the Wiki with the FAQ >> directly. >> >> What is unclear to me: what do you mean by "the state store [...] was >> errantly scoped to the TransformerProvider, not the Transformer" ? >> >> I would like to understand the actual issue. > > See this > gist: > https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4 > > Because MyStore is declared in the parent supplier, it will be shared > across tasks, even though the .get() function is instantiating a new > MyTransformer() for each task. It should have been declared in the > MyTransformer sub-class (say, around, line 15). > > >> -Matthias >> >> On 12/31/18 2:36 AM, Eric Lalonde wrote: >>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My >>> application was exhibiting all of the behavior as discussed in the FAQ >>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the >>> section: >>> >>> Why do I get an IllegalStateException when accessing record metadata? >>> >>> However, the root cause of my problem was not a due to the lack of >>> instantiating a new transformer for each task. The root cause of my >>> mistake was a bit more insidious: the state store that I instantiated >>> was errantly scoped to the TransformerProvider, not the Transformer. Of >>> course when I finally realized my error, the problem was obvious, but it >>> was not immediately obvious. May I suggest extending the FAQ to help >>> others as well? Perhaps it would be helpful to extend the aforementioned >>> FAQ section in the following way (highlighted text is my addition): >>> >>> >>> Why do I get an IllegalStateException when accessing record metadata? >>> >>> If you attach a new |Processor/Transformer/ValueTransformer| to your >>> topology using a corresponding supplier, you need to make sure that >>> the supplier returns a /new/ instance each time |get()| is called. >>> If you return the same object, a >>> single |Processor/Transformer/ValueTransformer| would be shared over >>> multiple tasks resulting in an |IllegalStateException| with error >>> message |"This should not happen as topic() should only be called >>> while a record is processed"| (depending on the method you are >>> calling it could also be |partition()|, |offset()|, >>> or |timestamp()| instead of |topic()|). Additionally, all >>> instantiated state stores must be scoped to the inner >>> Processor/Transformer/ValueTransformer class, and not to the parent >>> Provider class. Scoping state stores to the parent class will result >>> in state store re-use across tasks, which will also result >>> in IllegalStateExceptions. >>> >>> >>> Hope this saves someone else from making the same mistake :) >>> >>> - Eric >> >
signature.asc
Description: OpenPGP digital signature