Recently I encountered IllegalStateExceptions in my KafkaTransformer. My 
application was exhibiting all of the behavior as discussed in the FAQ 
(https://cwiki.apache.org/confluence/display/KAFKA/FAQ 
<https://cwiki.apache.org/confluence/display/KAFKA/FAQ>), under the section:

Why do I get an IllegalStateException when accessing record metadata? 

However, the root cause of my problem was not a due to the lack of 
instantiating a new transformer for each task. The root cause of my mistake was 
a bit more insidious: the state store that I instantiated was errantly scoped 
to the TransformerProvider, not the Transformer. Of course when I finally 
realized my error, the problem was obvious, but it was not immediately obvious. 
May I suggest extending the FAQ to help others as well? Perhaps it would be 
helpful to extend the aforementioned FAQ section in the following way 
(highlighted text is my addition):

Why do I get an IllegalStateException when accessing record metadata? 

If you attach a new Processor/Transformer/ValueTransformer to your topology 
using a corresponding supplier, you need to make sure that the supplier returns 
a new instance each time get() is called. If you return the same object, a 
single Processor/Transformer/ValueTransformer would be shared over multiple 
tasks resulting in an IllegalStateException with error message "This should not 
happen as topic() should only be called while a record is processed" (depending 
on the method you are calling it could also be partition(), offset(), or 
timestamp() instead of topic()). Additionally, all instantiated state stores 
must be scoped to the inner Processor/Transformer/ValueTransformer class, and 
not to the parent Provider class. Scoping state stores to the parent class will 
result in state store re-use across tasks, which will also result in 
IllegalStateExceptions.

Hope this saves someone else from making the same mistake :)

- Eric

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to