Hi,

we have the following problem - a Kafka Topic ~20Megabytes is made available as 
GlobalKTable for queries. With using RocksDB the GKTable is ready for queries 
instantly even without having reading the data complete - all get() requests 
return null. After a few seconds the data is querieable correctly - but this is 
to late for our application. Once we switch to IN_MEMORY we get the expected 
behavior. The store is only ready after all data has been read from topic.

How can we achieve the same behavior with the RocksDB setup?

Snipet to build KafkaStreams Topology

builder.globalTable(
  "topic-name",
  Consumed.with(Serdes.String(), Serdes.String()),
  Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS_DB)
);

Query the Table

while (true) {
            try {
                return streams.store(
                    
StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.STORE_NAME,
 QueryableStoreTypes.keyValueStore()));
            } catch (InvalidStateStoreException e) {
                logger.warn(e.getMessage());
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException ignored) {
                }
            }
        }

The store is queried with getStore().get(key); <- here we get the null values.

This is the Log Output when RocksDB - first query before state RUNNING

...
2023-11-21 15:15:40,629 INFO  [com.osr.serKafkaStreamsService] (Quarkus Main 
Thread) wait for kafka streams store to get ready: KafkaStreams has not been 
started, you can retry after calling start()
2023-11-21 15:15:41,781 INFO  [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) 
stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State 
transition from CREATED to REBALANCING
2023-11-21 15:15:41,819 INFO  [org.apa.kaf.str.sta.int.RocksDBTimestampedStore] 
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) Opening 
store store-name in regular mode
2023-11-21 15:15:41,825 INFO  [org.apa.kaf.str.pro.int.GlobalStateManagerImpl] 
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) 
global-stream-thread 
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] Restoring 
state for global store store-name
2023-11-21 15:15:43,753 INFO  [io.quarkus] (Quarkus Main Thread) demo 
1.0-SNAPSHOT on JVM (powered by Quarkus 3.2.8.Final) started in 5.874s.
2023-11-21 15:15:43,754 INFO  [io.quarkus] (Quarkus Main Thread) Profile dev 
activated. Live Coding activated.
2023-11-21 15:15:43,756 INFO  [io.quarkus] (Quarkus Main Thread) Installed 
features: [apicurio-registry-avro, cdi, config-yaml, kafka-client, 
kafka-streams, logging-gelf, smallrye-context-propagation, 
smallrye-fault-tolerance, smallrye-reactive-messaging, 
smallrye-reactive-messaging-kafka, vertx]
2023-11-21 15:15:44,195 INFO  [com.osr.ser.KafkaStreamsService] 
(vert.x-worker-thread-1) new file processed
2023-11-21 15:15:44,629 INFO  [org.apa.kaf.str.pro.int.GlobalStreamThread] 
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) 
global-stream-thread 
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] State 
transition from CREATED to RUNNING
2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams] 
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread) 
stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State 
transition from REBALANCING to RUNNING
2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) 
stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] Started 0 
stream threads
...

Once I configure with StoreType.IN_MEMORY no queries are done before the state 
is RUNNING

2023-11-21 15:28:25,511 WARN  [com.osr.serKafkaStreamsService] (Quarkus Main 
Thread) KafkaStreams has not been started, you can retry after calling start()
2023-11-21 15:28:26,730 INFO  [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) 
stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State 
transition from CREATED to REBALANCING
2023-11-21 15:28:26,752 INFO  [org.apa.kaf.str.pro.int.GlobalStateManagerImpl] 
(topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) 
global-stream-thread 
[topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] Restoring 
state for global store store-name
2023-11-21 15:28:29,834 WARN  [com.osr.serKafkaStreamsService] (Quarkus Main 
Thread) the state store, store-name, is not open.
2023-11-21 15:28:33,670 WARN  [com.osr.serKafkaStreamsService] (Quarkus Main 
Thread) the state store, store-name, is not open.
2023-11-21 15:28:33,763 INFO  [org.apa.kaf.str.pro.int.GlobalStreamThread] 
(topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) 
global-stream-thread 
[topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] State 
transition from CREATED to RUNNING
2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams] 
(topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread) 
stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State 
transition from REBALANCING to RUNNING
2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams] (pool-10-thread-1) 
stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 
stream threads
2023-11-21 15:28:36,774 INFO  [com.osr.serKafkaStreamsService] 
(vert.x-worker-thread-1) new file processed


Thanks for any input!
Christian



Reply via email to