I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and it did
not help.
When lag is large, the application still consumes data batches without
interleaving.



wt., 27 wrz 2022 o 05:51 John Roesler <vvcep...@apache.org> napisaƂ(a):

> Hi Tomasz,
>
> Thanks for asking. This sounds like the situation that we fixed in Apache
> Kafka 3.0, with KIP-695 (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
> ).
>
> Can you try upgrading and let us know if this fixes the problem?
>
> Thanks,
> -John
>
> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
> > Hi group,
> >
> > I wrote a simple kafka streams application with topology such as below:
> >
> > builder.addStateStore(
> >>     Stores.keyValueStoreBuilder(
> >>     Stores.persistentKeyValueStore("STORE"),
> >>     Serdes.String(), Serdes.String())
> >>         .withLoggingEnabled(storeConfig))|
> >
> >
> >
> > builder.stream("TOPIC_1", Consumed.with(...))
> >>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
> >>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
> >>     .map(...) // stateless
> >>     .transform(..., "STORE")  // stateful
> >
> >     .to("TOPIC_4");
> >
> >
> > All input topics have 6 partitions, and for the purpose of testing, we
> are
> > producing data to partition number 5.
> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
> >
> > The application works as expected when it has caught up to the lag, eg.
> > when reset tool is used with --to-latest parameter.
> > However, when the application is processing the messages starting from
> the
> > earliest offset, the inputs are provided in batches such as:
> >
> >    - ~1000 messages from TOPIC_1
> >    - ~1000 messages from TOPIC_2
> >    - ~1000 messages from TOPIC_3
> >
> > All of the messages have timestamps provided in headers, so I would
> expect
> > the application to interleave the messages from these three topics so
> that
> > their timestamps are in the ascending order.
> > However, this is not the case that I am observing. The messages are
> > processed in batches.
> >
> > How do I configure my application so that it processes messages in order
> > when it is catching up to the lag?
>

Reply via email to