Re: Why do I get an IllegalStateException when accessing record metadata?

2019-01-06 Thread Peter Levart

Hi,

I suggest the following:

If you attach a new Processor/Transformer/ValueTransformer to your 
topology using a corresponding supplier, you need to make sure that the 
supplier returns a new instance each time get() is called. If you return 
the same object, a single Processor/Transformer/ValueTransformer would 
be shared over multiple tasks resulting in an IllegalStateException with 
error message "This should not happen as topic() should only be called 
while a record is processed" (depending on the method you are calling it 
could also be partition(), offset(), or timestamp() instead of topic()).


...is appended with:

Additionally, all state stores to be used in a particular 
Processor/Transformer instance's process/transform callbacks must be 
obtained from ProcessorContext in the same Processor/Transformer 
instance (typically in its init() method).


The same holds for scheduled Punctuator callbacks if they use any state 
stores - they should only use the state stores obtained from the same 
Processor/Transformer instance's ProcessorContext as they were scheduled 
from.



I'm sure about the 1st statement, but intuitively I think the 2nd should 
also hold. The Processor/Transformer API is structured in a way that 
enforces such usage, but it can't prevent invalid usages, as for example 
Eric's.


What Eric did wrong was to assign a reference to state store obtained in 
Transformer.init() to a shared field outside the Transformer instance. 
This is wrong in several ways, among others also:
- the field's value got overwritten each time new Transformer instance 
got its init() method invoked, keeping just the last value assigned
- the field's value might have been overwritten from multiple threads 
while being accessed from multiple threads which could lead to data races
(here I'm not considering any advice given by the additional appended 
statements above)


So my advice would be to keep an eye on correct programming in general 
and let the API lead you.


Regards, Peter

On 1/3/19 9:50 PM, Matthias J. Sax wrote:

I see.

When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.


-Matthias

On 1/3/19 9:13 PM, Eric Lalonde wrote:



On Jan 2, 2019, at 6:31 AM, Matthias J. Sax mailto:matth...@confluent.io>> wrote:

Thanks for reporting this. Feel free to edit the Wiki with the FAQ
directly.

What is unclear to me: what do you mean by "the state store [...] was
errantly scoped to the TransformerProvider, not the Transformer" ?

I would like to understand the actual issue.

See this
gist: 
https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4

Because MyStore is declared in the parent supplier, it will be shared
across tasks, even though the .get() function is instantiating a new
MyTransformer() for each task. It should have been declared in the
MyTransformer sub-class (say, around, line 15).



-Matthias

On 12/31/18 2:36 AM, Eric Lalonde wrote:

Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
application was exhibiting all of the behavior as discussed in the FAQ
(https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
section:

Why do I get an IllegalStateException when accessing record metadata?

However, the root cause of my problem was not a due to the lack of
instantiating a new transformer for each task. The root cause of my
mistake was a bit more insidious: the state store that I instantiated
was errantly scoped to the TransformerProvider, not the Transformer. Of
course when I finally realized my error, the problem was obvious, but it
was not immediately obvious. May I suggest extending the FAQ to help
others as well? Perhaps it would be helpful to extend the aforementioned
FAQ section in the following way (highlighted text is my addition):


    Why do I get an IllegalStateException when accessing record metadata?

    If you attach a new |Processor/Transformer/ValueTransformer| to your
    topology using a corresponding supplier, you need to make sure that
    the supplier returns a /new/ instance each time |get()| is called.
    If you return the same object, a
    single |Processor/Transformer/ValueTransformer| would be shared over
    multiple tasks resulting in an |IllegalStateException| with error
    message |"This should not happen as topic() should only be called
    while a record is processed"| (depending on the method you are
    calling it could also be |partition()|, |offset()|,
    or |timestamp()| instead of |topic()|). Additionally, all
    instantiated state stores must be scoped to the inner
    Processor/Transformer/ValueTransformer class, and not to the parent
    Provider class. Scoping state stores to the parent class will result
    in state store re-use across tasks, which will also result
    in IllegalStateExceptions.


Hope this saves someone else from making the same mistake :)

- Eric




Re: Why do I get an IllegalStateException when accessing record metadata?

2019-01-03 Thread Matthias J. Sax
I see.

When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.


-Matthias

On 1/3/19 9:13 PM, Eric Lalonde wrote:
> 
> 
>> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax > > wrote:
>>
>> Thanks for reporting this. Feel free to edit the Wiki with the FAQ
>> directly.
>>
>> What is unclear to me: what do you mean by "the state store [...] was
>> errantly scoped to the TransformerProvider, not the Transformer" ?
>>
>> I would like to understand the actual issue.
> 
> See this
> gist: 
> https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
> 
> Because MyStore is declared in the parent supplier, it will be shared
> across tasks, even though the .get() function is instantiating a new
> MyTransformer() for each task. It should have been declared in the
> MyTransformer sub-class (say, around, line 15).
> 
> 
>> -Matthias
>>
>> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>>> application was exhibiting all of the behavior as discussed in the FAQ
>>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
>>> section:
>>>
>>> Why do I get an IllegalStateException when accessing record metadata? 
>>>
>>> However, the root cause of my problem was not a due to the lack of
>>> instantiating a new transformer for each task. The root cause of my
>>> mistake was a bit more insidious: the state store that I instantiated
>>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>>> course when I finally realized my error, the problem was obvious, but it
>>> was not immediately obvious. May I suggest extending the FAQ to help
>>> others as well? Perhaps it would be helpful to extend the aforementioned
>>> FAQ section in the following way (highlighted text is my addition):
>>>
>>>
>>>    Why do I get an IllegalStateException when accessing record metadata? 
>>>
>>>    If you attach a new |Processor/Transformer/ValueTransformer| to your
>>>    topology using a corresponding supplier, you need to make sure that
>>>    the supplier returns a /new/ instance each time |get()| is called.
>>>    If you return the same object, a
>>>    single |Processor/Transformer/ValueTransformer| would be shared over
>>>    multiple tasks resulting in an |IllegalStateException| with error
>>>    message |"This should not happen as topic() should only be called
>>>    while a record is processed"| (depending on the method you are
>>>    calling it could also be |partition()|, |offset()|,
>>>    or |timestamp()| instead of |topic()|). Additionally, all
>>>    instantiated state stores must be scoped to the inner
>>>    Processor/Transformer/ValueTransformer class, and not to the parent
>>>    Provider class. Scoping state stores to the parent class will result
>>>    in state store re-use across tasks, which will also result
>>>    in IllegalStateExceptions.
>>>
>>>
>>> Hope this saves someone else from making the same mistake :)
>>>
>>> - Eric
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Why do I get an IllegalStateException when accessing record metadata?

2019-01-03 Thread Eric Lalonde


> On Jan 2, 2019, at 6:31 AM, Matthias J. Sax  wrote:
> 
> Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly.
> 
> What is unclear to me: what do you mean by "the state store [...] was
> errantly scoped to the TransformerProvider, not the Transformer" ?
> 
> I would like to understand the actual issue.

See this gist: 
https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
 


Because MyStore is declared in the parent supplier, it will be shared across 
tasks, even though the .get() function is instantiating a new MyTransformer() 
for each task. It should have been declared in the MyTransformer sub-class 
(say, around, line 15).


> -Matthias
> 
> On 12/31/18 2:36 AM, Eric Lalonde wrote:
>> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
>> application was exhibiting all of the behavior as discussed in the FAQ
>> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section:
>> 
>> Why do I get an IllegalStateException when accessing record metadata? 
>> 
>> However, the root cause of my problem was not a due to the lack of
>> instantiating a new transformer for each task. The root cause of my
>> mistake was a bit more insidious: the state store that I instantiated
>> was errantly scoped to the TransformerProvider, not the Transformer. Of
>> course when I finally realized my error, the problem was obvious, but it
>> was not immediately obvious. May I suggest extending the FAQ to help
>> others as well? Perhaps it would be helpful to extend the aforementioned
>> FAQ section in the following way (highlighted text is my addition):
>> 
>> 
>>Why do I get an IllegalStateException when accessing record metadata? 
>> 
>>If you attach a new |Processor/Transformer/ValueTransformer| to your
>>topology using a corresponding supplier, you need to make sure that
>>the supplier returns a /new/ instance each time |get()| is called.
>>If you return the same object, a
>>single |Processor/Transformer/ValueTransformer| would be shared over
>>multiple tasks resulting in an |IllegalStateException| with error
>>message |"This should not happen as topic() should only be called
>>while a record is processed"| (depending on the method you are
>>calling it could also be |partition()|, |offset()|,
>>or |timestamp()| instead of |topic()|). Additionally, all
>>instantiated state stores must be scoped to the inner
>>Processor/Transformer/ValueTransformer class, and not to the parent
>>Provider class. Scoping state stores to the parent class will result
>>in state store re-use across tasks, which will also result
>>in IllegalStateExceptions.
>> 
>> 
>> Hope this saves someone else from making the same mistake :)
>> 
>> - Eric
> 



smime.p7s
Description: S/MIME Cryptographic Signature


Re: Why do I get an IllegalStateException when accessing record metadata?

2019-01-02 Thread Matthias J. Sax
Thanks for reporting this. Feel free to edit the Wiki with the FAQ directly.

What is unclear to me: what do you mean by "the state store [...] was
errantly scoped to the TransformerProvider, not the Transformer" ?

I would like to understand the actual issue.


-Matthias

On 12/31/18 2:36 AM, Eric Lalonde wrote:
> Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
> application was exhibiting all of the behavior as discussed in the FAQ
> (https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the section:
> 
> Why do I get an IllegalStateException when accessing record metadata? 
> 
> However, the root cause of my problem was not a due to the lack of
> instantiating a new transformer for each task. The root cause of my
> mistake was a bit more insidious: the state store that I instantiated
> was errantly scoped to the TransformerProvider, not the Transformer. Of
> course when I finally realized my error, the problem was obvious, but it
> was not immediately obvious. May I suggest extending the FAQ to help
> others as well? Perhaps it would be helpful to extend the aforementioned
> FAQ section in the following way (highlighted text is my addition):
> 
> 
> Why do I get an IllegalStateException when accessing record metadata? 
> 
> If you attach a new |Processor/Transformer/ValueTransformer| to your
> topology using a corresponding supplier, you need to make sure that
> the supplier returns a /new/ instance each time |get()| is called.
> If you return the same object, a
> single |Processor/Transformer/ValueTransformer| would be shared over
> multiple tasks resulting in an |IllegalStateException| with error
> message |"This should not happen as topic() should only be called
> while a record is processed"| (depending on the method you are
> calling it could also be |partition()|, |offset()|,
> or |timestamp()| instead of |topic()|). Additionally, all
> instantiated state stores must be scoped to the inner
> Processor/Transformer/ValueTransformer class, and not to the parent
> Provider class. Scoping state stores to the parent class will result
> in state store re-use across tasks, which will also result
> in IllegalStateExceptions.
> 
> 
> Hope this saves someone else from making the same mistake :)
> 
> - Eric



signature.asc
Description: OpenPGP digital signature


Re: Why do I get an IllegalStateException when accessing record metadata?

2018-12-30 Thread Eric Lalonde
Recently I encountered IllegalStateExceptions in my KafkaTransformer. My 
application was exhibiting all of the behavior as discussed in the FAQ 
(https://cwiki.apache.org/confluence/display/KAFKA/FAQ 
), under the section:

Why do I get an IllegalStateException when accessing record metadata? 

However, the root cause of my problem was not a due to the lack of 
instantiating a new transformer for each task. The root cause of my mistake was 
a bit more insidious: the state store that I instantiated was errantly scoped 
to the TransformerProvider, not the Transformer. Of course when I finally 
realized my error, the problem was obvious, but it was not immediately obvious. 
May I suggest extending the FAQ to help others as well? Perhaps it would be 
helpful to extend the aforementioned FAQ section in the following way 
(highlighted text is my addition):

Why do I get an IllegalStateException when accessing record metadata? 

If you attach a new Processor/Transformer/ValueTransformer to your topology 
using a corresponding supplier, you need to make sure that the supplier returns 
a new instance each time get() is called. If you return the same object, a 
single Processor/Transformer/ValueTransformer would be shared over multiple 
tasks resulting in an IllegalStateException with error message "This should not 
happen as topic() should only be called while a record is processed" (depending 
on the method you are calling it could also be partition(), offset(), or 
timestamp() instead of topic()). Additionally, all instantiated state stores 
must be scoped to the inner Processor/Transformer/ValueTransformer class, and 
not to the parent Provider class. Scoping state stores to the parent class will 
result in state store re-use across tasks, which will also result in 
IllegalStateExceptions.

Hope this saves someone else from making the same mistake :)

- Eric

smime.p7s
Description: S/MIME Cryptographic Signature