Hi,

I suggest the following:

If you attach a new Processor/Transformer/ValueTransformer to your topology using a corresponding supplier, you need to make sure that the supplier returns a new instance each time get() is called. If you return the same object, a single Processor/Transformer/ValueTransformer would be shared over multiple tasks resulting in an IllegalStateException with error message "This should not happen as topic() should only be called while a record is processed" (depending on the method you are calling it could also be partition(), offset(), or timestamp() instead of topic()).

...is appended with:

Additionally, all state stores to be used in a particular Processor/Transformer instance's process/transform callbacks must be obtained from ProcessorContext in the same Processor/Transformer instance (typically in its init() method).

The same holds for scheduled Punctuator callbacks if they use any state stores - they should only use the state stores obtained from the same Processor/Transformer instance's ProcessorContext as they were scheduled from.


I'm sure about the 1st statement, but intuitively I think the 2nd should also hold. The Processor/Transformer API is structured in a way that enforces such usage, but it can't prevent invalid usages, as for example Eric's.

What Eric did wrong was to assign a reference to state store obtained in Transformer.init() to a shared field outside the Transformer instance. This is wrong in several ways, among others also: - the field's value got overwritten each time new Transformer instance got its init() method invoked, keeping just the last value assigned - the field's value might have been overwritten from multiple threads while being accessed from multiple threads which could lead to data races (here I'm not considering any advice given by the additional appended statements above)

So my advice would be to keep an eye on correct programming in general and let the API lead you.

Regards, Peter

On 1/3/19 9:50 PM, Matthias J. Sax wrote:
I see.

When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.


-Matthias

On 1/3/19 9:13 PM, Eric Lalonde wrote:

On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matth...@confluent.io
<mailto:matth...@confluent.io>> wrote:

Thanks for reporting this. Feel free to edit the Wiki with the FAQ
directly.

What is unclear to me: what do you mean by "the state store [...] was
errantly scoped to the TransformerProvider, not the Transformer" ?

I would like to understand the actual issue.
See this
gist: 
https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4

Because MyStore is declared in the parent supplier, it will be shared
across tasks, even though the .get() function is instantiating a new
MyTransformer() for each task. It should have been declared in the
MyTransformer sub-class (say, around, line 15).


-Matthias

On 12/31/18 2:36 AM, Eric Lalonde wrote:
Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
application was exhibiting all of the behavior as discussed in the FAQ
(https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
section:

Why do I get an IllegalStateException when accessing record metadata?

However, the root cause of my problem was not a due to the lack of
instantiating a new transformer for each task. The root cause of my
mistake was a bit more insidious: the state store that I instantiated
was errantly scoped to the TransformerProvider, not the Transformer. Of
course when I finally realized my error, the problem was obvious, but it
was not immediately obvious. May I suggest extending the FAQ to help
others as well? Perhaps it would be helpful to extend the aforementioned
FAQ section in the following way (highlighted text is my addition):


    Why do I get an IllegalStateException when accessing record metadata?

    If you attach a new |Processor/Transformer/ValueTransformer| to your
    topology using a corresponding supplier, you need to make sure that
    the supplier returns a /new/ instance each time |get()| is called.
    If you return the same object, a
    single |Processor/Transformer/ValueTransformer| would be shared over
    multiple tasks resulting in an |IllegalStateException| with error
    message |"This should not happen as topic() should only be called
    while a record is processed"| (depending on the method you are
    calling it could also be |partition()|, |offset()|,
    or |timestamp()| instead of |topic()|). Additionally, all
    instantiated state stores must be scoped to the inner
    Processor/Transformer/ValueTransformer class, and not to the parent
    Provider class. Scoping state stores to the parent class will result
    in state store re-use across tasks, which will also result
    in IllegalStateExceptions.


Hope this saves someone else from making the same mistake :)

- Eric

Reply via email to