Messages are processed in timestamp order across topics. Thus, if your
messages in the `KTable` topic have smaller timestamps than the records
in the other topics, they will be processed first and thus effectively
bootstrap your KTable state store.

You many want to increase `max.taskl.idle.ms` parameter to get stricter
guarantees for this case.

During processing, if new records arrive in the KTable topic, they will
be processed according to their timestamps compare the other records
from other topic, as well.

If you want to give those KTable records "highest priority" you could
additionally use a custom TimestampExtractor that always returns `0` as
timestamp and use this extractor only for this topic/KTable (passed in
via `Consumed` parameter).

If you want to avoid re-reading the topic on restart, you should use a
persistent store instead of an in-memory store.

Shameless plug: Check out my Kafka Summit talks about this topic for
more details: https://kafka.apache.org/videos

 - The Flux Capacitor of Kafka Streams and ksqlDB
 - What’s the time? …and why?


-Matthias


On 11/20/20 7:24 AM, Tomer Cohen wrote:
> Hello everyone
> 
> I have the following use case:
> 
> I have two Kafka topics, one is meant to be used as a stream of incoming
> messages to be processed, the other is meant as a store of records that is
> meant to be used as a bootstrap to the initial state of the application.
> 
> Is there a way to do the following:
> 
> 1. Read all messages from a Kafka topic when the application starts up and
> store all ConsumerRecord in memory from the topic that is meant to
> bootstrap the application to its initial state
> 2. Only after all messages have been read allow the ConsumerRecord from the
> stream topic to be processed
> 3. As there may be additional records on the state topic to incorporate
> them into the application's state when the application is running without
> having to restart the application.
> 
> The issue I am having is how do I read the entire topic in its entirety and
> populate the KTable? Every time that my application starts in order to
> reach its initial state I need to consume all the messages that are on the
> topic from the very beginning prior to my application processing messages
> from another topic.
> 
> Thanks!
> 

Reply via email to