Hi,
I suggest the following:
If you attach a new Processor/Transformer/ValueTransformer to your
topology using a corresponding supplier, you need to make sure that the
supplier returns a new instance each time get() is called. If you return
the same object, a single Processor/Transformer/ValueTransformer would
be shared over multiple tasks resulting in an IllegalStateException with
error message "This should not happen as topic() should only be called
while a record is processed" (depending on the method you are calling it
could also be partition(), offset(), or timestamp() instead of topic()).
...is appended with:
Additionally, all state stores to be used in a particular
Processor/Transformer instance's process/transform callbacks must be
obtained from ProcessorContext in the same Processor/Transformer
instance (typically in its init() method).
The same holds for scheduled Punctuator callbacks if they use any state
stores - they should only use the state stores obtained from the same
Processor/Transformer instance's ProcessorContext as they were scheduled
from.
I'm sure about the 1st statement, but intuitively I think the 2nd should
also hold. The Processor/Transformer API is structured in a way that
enforces such usage, but it can't prevent invalid usages, as for example
Eric's.
What Eric did wrong was to assign a reference to state store obtained in
Transformer.init() to a shared field outside the Transformer instance.
This is wrong in several ways, among others also:
- the field's value got overwritten each time new Transformer instance
got its init() method invoked, keeping just the last value assigned
- the field's value might have been overwritten from multiple threads
while being accessed from multiple threads which could lead to data races
(here I'm not considering any advice given by the additional appended
statements above)
So my advice would be to keep an eye on correct programming in general
and let the API lead you.
Regards, Peter
On 1/3/19 9:50 PM, Matthias J. Sax wrote:
I see.
When updating the FAQ, it should be clear what you mean. Your current
proposal was unclear to me, and thus, it might be unclear to other
users, too.
-Matthias
On 1/3/19 9:13 PM, Eric Lalonde wrote:
On Jan 2, 2019, at 6:31 AM, Matthias J. Sax <matth...@confluent.io
<mailto:matth...@confluent.io>> wrote:
Thanks for reporting this. Feel free to edit the Wiki with the FAQ
directly.
What is unclear to me: what do you mean by "the state store [...] was
errantly scoped to the TransformerProvider, not the Transformer" ?
I would like to understand the actual issue.
See this
gist:
https://gist.github.com/elalonde/5b7be53973b5b847bac86754077b1a80#file-gistfile1-txt-L4
Because MyStore is declared in the parent supplier, it will be shared
across tasks, even though the .get() function is instantiating a new
MyTransformer() for each task. It should have been declared in the
MyTransformer sub-class (say, around, line 15).
-Matthias
On 12/31/18 2:36 AM, Eric Lalonde wrote:
Recently I encountered IllegalStateExceptions in my KafkaTransformer. My
application was exhibiting all of the behavior as discussed in the FAQ
(https://cwiki.apache.org/confluence/display/KAFKA/FAQ), under the
section:
Why do I get an IllegalStateException when accessing record metadata?
However, the root cause of my problem was not a due to the lack of
instantiating a new transformer for each task. The root cause of my
mistake was a bit more insidious: the state store that I instantiated
was errantly scoped to the TransformerProvider, not the Transformer. Of
course when I finally realized my error, the problem was obvious, but it
was not immediately obvious. May I suggest extending the FAQ to help
others as well? Perhaps it would be helpful to extend the aforementioned
FAQ section in the following way (highlighted text is my addition):
Why do I get an IllegalStateException when accessing record metadata?
If you attach a new |Processor/Transformer/ValueTransformer| to your
topology using a corresponding supplier, you need to make sure that
the supplier returns a /new/ instance each time |get()| is called.
If you return the same object, a
single |Processor/Transformer/ValueTransformer| would be shared over
multiple tasks resulting in an |IllegalStateException| with error
message |"This should not happen as topic() should only be called
while a record is processed"| (depending on the method you are
calling it could also be |partition()|, |offset()|,
or |timestamp()| instead of |topic()|). Additionally, all
instantiated state stores must be scoped to the inner
Processor/Transformer/ValueTransformer class, and not to the parent
Provider class. Scoping state stores to the parent class will result
in state store re-use across tasks, which will also result
in IllegalStateExceptions.
Hope this saves someone else from making the same mistake :)
- Eric