Hi Everyone,
I have a question related to messages auto reprocessing in kafka streams and
Transformer/Processor init()/close() methods.
Is possible that in some scenarios (failures, rebalance etc.) a message is
processed twice by Transformer.transform() without calling Transformer.init()
between the first and the second processing?
To illustrate my question with an example. Let's say I have kafka streams
application with a default at_least_once semantics setting. The topology is the
following - read messages from one topic, apply Transformer and produce
messages to another topic. Transformer:
new TransformerSupplier() {
Transformer get() {
return new Transformer() {
private ProcessorContext context;
void init(ProcessorContext context) {
logger.info("init called());
this.context = context;
}
KeyValue transform(K key, V value) {
logger.info("Transform offset: {}, partition: {},
context.offset(), context.partition());
return new KeyValue(key, value);
}
void close() {
logger.info("close() called.")
}
}
}
}
Is it possible that the app will log "Transform offset: x, partition: y" twice
with the same x and y values each time without logging "init() called." between
(chronologically)?
I tried to simulate some failure scenarios (for example when
CommitFailedException is thrown) but in my simple test cases init() was always
called between the first and the second reprocessing. Although I could have
easily missed something or a scenario where this is not true which is why I'm
asking this question here.
Sorry, if my question is not clear - please let me know in this case and I'll
try to clarify.
Thank you for any help you can provide.
Regards,
Tomasz