Recently I encountered IllegalStateExceptions in my KafkaTransformer. My application was exhibiting all of the behavior as discussed in the FAQ (https://cwiki.apache.org/confluence/display/KAFKA/FAQ <https://cwiki.apache.org/confluence/display/KAFKA/FAQ>), under the section:
Why do I get an IllegalStateException when accessing record metadata? However, the root cause of my problem was not a due to the lack of instantiating a new transformer for each task. The root cause of my mistake was a bit more insidious: the state store that I instantiated was errantly scoped to the TransformerProvider, not the Transformer. Of course when I finally realized my error, the problem was obvious, but it was not immediately obvious. May I suggest extending the FAQ to help others as well? Perhaps it would be helpful to extend the aforementioned FAQ section in the following way (highlighted text is my addition): Why do I get an IllegalStateException when accessing record metadata? If you attach a new Processor/Transformer/ValueTransformer to your topology using a corresponding supplier, you need to make sure that the supplier returns a new instance each time get() is called. If you return the same object, a single Processor/Transformer/ValueTransformer would be shared over multiple tasks resulting in an IllegalStateException with error message "This should not happen as topic() should only be called while a record is processed" (depending on the method you are calling it could also be partition(), offset(), or timestamp() instead of topic()). Additionally, all instantiated state stores must be scoped to the inner Processor/Transformer/ValueTransformer class, and not to the parent Provider class. Scoping state stores to the parent class will result in state store re-use across tasks, which will also result in IllegalStateExceptions. Hope this saves someone else from making the same mistake :) - Eric
smime.p7s
Description: S/MIME Cryptographic Signature