> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly.
> 
> What is unclear to me: what do you mean by "the state store [...] was
> errantly scoped to the TransformerProvider, not the Transformer" ?
> 
> I would like to understand the actual issue.

See this gist: 
https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
 
<https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4>

Because MyStore is declared in the parent supplier, it will be shared across 
tasks, even though the .get() function is instantiating a new MyTransformer() 
for each task. It should have been declared in the MyTransformer sub-class 
(say, around, line 15).


> -Matthias
> 
> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>> application was exhibiting all of the behavior as discussed in the FAQ
>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section:
>> 
>> Why do I get an IllegalStateException when accessing record metadata? 
>> 
>> However, the root cause of my problem was not a due to the lack of
>> instantiating a new transformer for each task. The root cause of my
>> mistake was a bit more insidious: the state store that I instantiated
>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>> course when I finally realized my error, the problem was obvious, but it
>> was not immediately obvious. May I suggest extending the FAQ to help
>> others as well? Perhaps it would be helpful to extend the aforementioned
>> FAQ section in the following way (highlighted text is my addition):
>> 
>> 
>>    Why do I get an IllegalStateException when accessing record metadata? 
>> 
>>    If you attach a new |Processor/Transformer/ValueTransformer| to your
>>    topology using a corresponding supplier, you need to make sure that
>>    the supplier returns a /new/ instance each time |get()| is called.
>>    If you return the same object, a
>>    single |Processor/Transformer/ValueTransformer| would be shared over
>>    multiple tasks resulting in an |IllegalStateException| with error
>>    message |"This should not happen as topic() should only be called
>>    while a record is processed"| (depending on the method you are
>>    calling it could also be |partition()|, |offset()|,
>>    or |timestamp()| instead of |topic()|). Additionally, all
>>    instantiated state stores must be scoped to the inner
>>    Processor/Transformer/ValueTransformer class, and not to the parent
>>    Provider class. Scoping state stores to the parent class will result
>>    in state store re-use across tasks, which will also result
>>    in IllegalStateExceptions.
>> 
>> 
>> Hope this saves someone else from making the same mistake :)
>> 
>> - Eric
> 

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to