I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did not help. When lag is large, the application still consumes data batches without interleaving.
wt., 27 wrz 2022 o 05:51 John Roesler <vvcep...@apache.org> napisaĆ(a): > Hi Tomasz, > > Thanks for asking. This sounds like the situation that we fixed in Apache > Kafka 3.0, with KIP-695 ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization > ). > > Can you try upgrading and let us know if this fixes the problem? > > Thanks, > -John > > On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote: > > Hi group, > > > > I wrote a simple kafka streams application with topology such as below: > > > > builder.addStateStore( > >> Stores.keyValueStoreBuilder( > >> Stores.persistentKeyValueStore("STORE"), > >> Serdes.String(), Serdes.String()) > >> .withLoggingEnabled(storeConfig))| > > > > > > > > builder.stream("TOPIC_1", Consumed.with(...)) > >> .merge(builder.stream("TOPIC_2", Consumed.with(...)) > >> .merge(builder.stream("TOPIC_3", Consumed.with(...)) > >> .map(...) // stateless > >> .transform(..., "STORE") // stateful > > > > .to("TOPIC_4"); > > > > > > All input topics have 6 partitions, and for the purpose of testing, we > are > > producing data to partition number 5. > > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1 > > > > The application works as expected when it has caught up to the lag, eg. > > when reset tool is used with --to-latest parameter. > > However, when the application is processing the messages starting from > the > > earliest offset, the inputs are provided in batches such as: > > > > - ~1000 messages from TOPIC_1 > > - ~1000 messages from TOPIC_2 > > - ~1000 messages from TOPIC_3 > > > > All of the messages have timestamps provided in headers, so I would > expect > > the application to interleave the messages from these three topics so > that > > their timestamps are in the ascending order. > > However, this is not the case that I am observing. The messages are > > processed in batches. > > > > How do I configure my application so that it processes messages in order > > when it is catching up to the lag? >