Just to make sure I understand the logs, you're saying the "new file
processed" lines represent store queries, and presumably the
com.osr.serKafkaStreamsService is your service that's issuing these queries?

You need to wait for the app to finish restoring state before querying it.
Based on this message -- "KafkaStreams has not been started, you can retry
after calling start()" -- I assume you're kicking off the querying service
right away and blocking queries until after KafkaStreams#start is called.
But you need to wait for it to actually finish starting up, not just for
start() to be called. The best way to do this is by setting a state
listener via KafkaStreams#setStateListener, and then using this to listen
in on the KafkaStreams.State and blocking the queries until the state has
changed to RUNNING.

In case you're curious about why this seems to work with in-memory stores
but not with rocksdb, it seems like in the in-memory case, the queries that
are attempted during restoration are blocked due to the store being closed
(according to "(Quarkus Main Thread) the state store, store-name, is not
open.")

So why is the store closed for most of the restoration in the in-memory
case only? This gets a bit into the weeds, but it has to do with the
sequence of events in starting up a state store. When the global thread
starts up, it'll first loop over all its state stores and call #init on
them. Two things have to happen inside #init: the store is opened, and the
store registers itself with the ProcessorContext. The #register involves
various things, including a call to fetch the end offsets of the topic for
global state stores. This is a blocking call, so the store might stay
inside the #register call for a relatively long while.

For RocksDB stores, we open the store first and then call #register, so by
the time the GlobalStreamThread is sitting around waiting on the end
offsets response, the store is open and your queries are getting through to
it. However the in-memory store actually registers itself *first*, before
marking itself as open, and so it remains closed for most of the time it
spends in restoration and blocks any query attempts during this time.

I suppose it would make sense to align the two store implementations to
have the same behavior, and the in-memory store is probably technically
more correct. But in the end you really should just wait for the
KafkaStreams.State to get to RUNNING before querying the state store, as
that's the only true guarantee.

Hope this helps!

-Sophie

On Tue, Nov 21, 2023 at 6:44 AM Christian Zuegner
<christian.zueg...@ams-osram.com.invalid> wrote:

> Hi,
>
> we have the following problem - a Kafka Topic ~20Megabytes is made
> available as GlobalKTable for queries. With using RocksDB the GKTable is
> ready for queries instantly even without having reading the data complete -
> all get() requests return null. After a few seconds the data is querieable
> correctly - but this is to late for our application. Once we switch to
> IN_MEMORY we get the expected behavior. The store is only ready after all
> data has been read from topic.
>
> How can we achieve the same behavior with the RocksDB setup?
>
> Snipet to build KafkaStreams Topology
>
> builder.globalTable(
>   "topic-name",
>   Consumed.with(Serdes.String(), Serdes.String()),
>
> Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS_DB)
> );
>
> Query the Table
>
> while (true) {
>             try {
>                 return streams.store(
>
> StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.STORE_NAME,
> QueryableStoreTypes.keyValueStore()));
>             } catch (InvalidStateStoreException e) {
>                 logger.warn(e.getMessage());
>                 try {
>                     Thread.sleep(3000);
>                 } catch (InterruptedException ignored) {
>                 }
>             }
>         }
>
> The store is queried with getStore().get(key); <- here we get the null
> values.
>
> This is the Log Output when RocksDB - first query before state RUNNING
>
> ...
> 2023-11-21 15:15:40,629 INFO  [com.osr.serKafkaStreamsService] (Quarkus
> Main Thread) wait for kafka streams store to get ready: KafkaStreams has
> not been started, you can retry after calling start()
> 2023-11-21 15:15:41,781 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State transition from
> CREATED to REBALANCING
> 2023-11-21 15:15:41,819 INFO  
> [org.apa.kaf.str.sta.int.RocksDBTimestampedStore]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> Opening store store-name in regular mode
> 2023-11-21 15:15:41,825 INFO  [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> global-stream-thread
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread]
> Restoring state for global store store-name
> 2023-11-21 15:15:43,753 INFO  [io.quarkus] (Quarkus Main Thread) demo
> 1.0-SNAPSHOT on JVM (powered by Quarkus 3.2.8.Final) started in 5.874s.
> 2023-11-21 15:15:43,754 INFO  [io.quarkus] (Quarkus Main Thread) Profile
> dev activated. Live Coding activated.
> 2023-11-21 15:15:43,756 INFO  [io.quarkus] (Quarkus Main Thread) Installed
> features: [apicurio-registry-avro, cdi, config-yaml, kafka-client,
> kafka-streams, logging-gelf, smallrye-context-propagation,
> smallrye-fault-tolerance, smallrye-reactive-messaging,
> smallrye-reactive-messaging-kafka, vertx]
> 2023-11-21 15:15:44,195 INFO  [com.osr.ser.KafkaStreamsService]
> (vert.x-worker-thread-1) new file processed
> 2023-11-21 15:15:44,629 INFO  [org.apa.kaf.str.pro.int.GlobalStreamThread]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> global-stream-thread
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] State
> transition from CREATED to RUNNING
> 2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
> (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State
> transition from REBALANCING to RUNNING
> 2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] Started 0 stream threads
> ...
>
> Once I configure with StoreType.IN_MEMORY no queries are done before the
> state is RUNNING
>
> 2023-11-21 15:28:25,511 WARN  [com.osr.serKafkaStreamsService] (Quarkus
> Main Thread) KafkaStreams has not been started, you can retry after calling
> start()
> 2023-11-21 15:28:26,730 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State transition from
> CREATED to REBALANCING
> 2023-11-21 15:28:26,752 INFO  [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
> (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> global-stream-thread
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread]
> Restoring state for global store store-name
> 2023-11-21 15:28:29,834 WARN  [com.osr.serKafkaStreamsService] (Quarkus
> Main Thread) the state store, store-name, is not open.
> 2023-11-21 15:28:33,670 WARN  [com.osr.serKafkaStreamsService] (Quarkus
> Main Thread) the state store, store-name, is not open.
> 2023-11-21 15:28:33,763 INFO  [org.apa.kaf.str.pro.int.GlobalStreamThread]
> (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> global-stream-thread
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] State
> transition from CREATED to RUNNING
> 2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
> (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State
> transition from REBALANCING to RUNNING
> 2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
> (pool-10-thread-1) stream-client
> [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 stream threads
> 2023-11-21 15:28:36,774 INFO  [com.osr.serKafkaStreamsService]
> (vert.x-worker-thread-1) new file processed
>
>
> Thanks for any input!
> Christian
>
>
>
>

Reply via email to