Filed https://issues.apache.org/jira/browse/KAFKA-16295

Also, for global store support, we do have a ticket already: https://issues.apache.org/jira/browse/KAFKA-13523

It's actually a little bit more involved due to position tracking... I guess we might need a KIP to fix this.

And yes: if anybody has interest to pick it up, that would be great. We did push a couple of IQv2 improvements into upcoming 3.7 release, and of course hope to make it the default eventually and to deprecate IQv1.

We should actually also start to document IQv2... https://issues.apache.org/jira/browse/KAFKA-16262


-Matthias

On 11/21/23 4:50 PM, Sophie Blee-Goldman wrote:
Just to make sure I understand the logs, you're saying the "new file
processed" lines represent store queries, and presumably the
com.osr.serKafkaStreamsService is your service that's issuing these queries?

You need to wait for the app to finish restoring state before querying it.
Based on this message -- "KafkaStreams has not been started, you can retry
after calling start()" -- I assume you're kicking off the querying service
right away and blocking queries until after KafkaStreams#start is called.
But you need to wait for it to actually finish starting up, not just for
start() to be called. The best way to do this is by setting a state
listener via KafkaStreams#setStateListener, and then using this to listen
in on the KafkaStreams.State and blocking the queries until the state has
changed to RUNNING.

In case you're curious about why this seems to work with in-memory stores
but not with rocksdb, it seems like in the in-memory case, the queries that
are attempted during restoration are blocked due to the store being closed
(according to "(Quarkus Main Thread) the state store, store-name, is not
open.")

So why is the store closed for most of the restoration in the in-memory
case only? This gets a bit into the weeds, but it has to do with the
sequence of events in starting up a state store. When the global thread
starts up, it'll first loop over all its state stores and call #init on
them. Two things have to happen inside #init: the store is opened, and the
store registers itself with the ProcessorContext. The #register involves
various things, including a call to fetch the end offsets of the topic for
global state stores. This is a blocking call, so the store might stay
inside the #register call for a relatively long while.

For RocksDB stores, we open the store first and then call #register, so by
the time the GlobalStreamThread is sitting around waiting on the end
offsets response, the store is open and your queries are getting through to
it. However the in-memory store actually registers itself *first*, before
marking itself as open, and so it remains closed for most of the time it
spends in restoration and blocks any query attempts during this time.

I suppose it would make sense to align the two store implementations to
have the same behavior, and the in-memory store is probably technically
more correct. But in the end you really should just wait for the
KafkaStreams.State to get to RUNNING before querying the state store, as
that's the only true guarantee.

Hope this helps!

-Sophie

On Tue, Nov 21, 2023 at 6:44 AM Christian Zuegner
<christian.zueg...@ams-osram.com.invalid> wrote:

Hi,

we have the following problem - a Kafka Topic ~20Megabytes is made
available as GlobalKTable for queries. With using RocksDB the GKTable is
ready for queries instantly even without having reading the data complete -
all get() requests return null. After a few seconds the data is querieable
correctly - but this is to late for our application. Once we switch to
IN_MEMORY we get the expected behavior. The store is only ready after all
data has been read from topic.

How can we achieve the same behavior with the RocksDB setup?

Snipet to build KafkaStreams Topology

builder.globalTable(
   "topic-name",
   Consumed.with(Serdes.String(), Serdes.String()),

Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS_DB)
);

Query the Table

while (true) {
             try {
                 return streams.store(

StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.STORE_NAME,
QueryableStoreTypes.keyValueStore()));
             } catch (InvalidStateStoreException e) {
                 logger.warn(e.getMessage());
                 try {
                     Thread.sleep(3000);
                 } catch (InterruptedException ignored) {
                 }
             }
         }

The store is queried with getStore().get(key); <- here we get the null
values.

This is the Log Output when RocksDB - first query before state RUNNING

...
2023-11-21 15:15:40,629 INFO  [com.osr.serKafkaStreamsService] (Quarkus
Main Thread) wait for kafka streams store to get ready: KafkaStreams has
not been started, you can retry after calling start()
2023-11-21 15:15:41,781 INFO  [org.apa.kaf.str.KafkaStreams]
(pool-10-thread-1) stream-client
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State transition from
CREATED to REBALANCING
2023-11-21 15:15:41,819 INFO  [org.apa.kaf.str.sta.int.RocksDBTimestampedStore]
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
Opening store store-name in regular mode
2023-11-21 15:15:41,825 INFO  [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
global-stream-thread
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread]
Restoring state for global store store-name
2023-11-21 15:15:43,753 INFO  [io.quarkus] (Quarkus Main Thread) demo
1.0-SNAPSHOT on JVM (powered by Quarkus 3.2.8.Final) started in 5.874s.
2023-11-21 15:15:43,754 INFO  [io.quarkus] (Quarkus Main Thread) Profile
dev activated. Live Coding activated.
2023-11-21 15:15:43,756 INFO  [io.quarkus] (Quarkus Main Thread) Installed
features: [apicurio-registry-avro, cdi, config-yaml, kafka-client,
kafka-streams, logging-gelf, smallrye-context-propagation,
smallrye-fault-tolerance, smallrye-reactive-messaging,
smallrye-reactive-messaging-kafka, vertx]
2023-11-21 15:15:44,195 INFO  [com.osr.ser.KafkaStreamsService]
(vert.x-worker-thread-1) new file processed
2023-11-21 15:15:44,629 INFO  [org.apa.kaf.str.pro.int.GlobalStreamThread]
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
global-stream-thread
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread] State
transition from CREATED to RUNNING
2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
(topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State
transition from REBALANCING to RUNNING
2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
(pool-10-thread-1) stream-client
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] Started 0 stream threads
...

Once I configure with StoreType.IN_MEMORY no queries are done before the
state is RUNNING

2023-11-21 15:28:25,511 WARN  [com.osr.serKafkaStreamsService] (Quarkus
Main Thread) KafkaStreams has not been started, you can retry after calling
start()
2023-11-21 15:28:26,730 INFO  [org.apa.kaf.str.KafkaStreams]
(pool-10-thread-1) stream-client
[topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State transition from
CREATED to REBALANCING
2023-11-21 15:28:26,752 INFO  [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
(topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
global-stream-thread
[topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread]
Restoring state for global store store-name
2023-11-21 15:28:29,834 WARN  [com.osr.serKafkaStreamsService] (Quarkus
Main Thread) the state store, store-name, is not open.
2023-11-21 15:28:33,670 WARN  [com.osr.serKafkaStreamsService] (Quarkus
Main Thread) the state store, store-name, is not open.
2023-11-21 15:28:33,763 INFO  [org.apa.kaf.str.pro.int.GlobalStreamThread]
(topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
global-stream-thread
[topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread] State
transition from CREATED to RUNNING
2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
(topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State
transition from REBALANCING to RUNNING
2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
(pool-10-thread-1) stream-client
[topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 stream threads
2023-11-21 15:28:36,774 INFO  [com.osr.serKafkaStreamsService]
(vert.x-worker-thread-1) new file processed


Thanks for any input!
Christian





Reply via email to