Ah, yeah, IQ v2 was a pretty big feature so it hasn't yet been implemented
across all parts of Kafka Streams. You'll notice that we're still actively
putting out new KIPs trying to complete this feature. I don't think there's
any particular reason that Global KTables can't be made to work with IQ v2,
but no one's gotten around to doing it yet. I did a quick search and
couldn't even find a JIRA ticket for this, so it doesn't seem to be on
anyone's radar.

If this is something you'd like to see implemented, go ahead and file a
ticket for it on JIRA
<https://issues.apache.org/jira/projects/KAFKA/issues/>. Honestly it may
have been overlooked completely. Sometimes the global table stuff can get
overshadowed since it's relatively less common. So definitely go ahead and
file a ticket to hopefully kick off the conversation.

I'm not saying it'll get picked up right away, but if there's no ticket and
no one asking for it, then it might never happen at all. Of course, we
accept KIPs -- if you really need this feature, or if you've ever been
interested in contributing to Apache Kafka/Kafka Streams, consider
implementing it yourself!

On Wed, Nov 22, 2023 at 12:26 AM Christian Zuegner
<christian.zueg...@ams-osram.com.invalid> wrote:

> Hi Sophie,
>
> thanks a lot for you tip! I've implemented a StateListener - to block
> queries when the state does not equal RUNNING. This will work perfectly now
> for our use-case!
>
>
> In the meantime I noticed the InteractiveQuery API v2 and give it a try.
> Unfortunately it seems not to cope with GlobalKTable. When try to run this:
>
> return
> streams.query(StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(key)));
>
> I got: "Global stores do not yet support the KafkaStreams#query API. Use
> KafkaStreams#store instead."
>
> From my point of view it would be great if this will work and behave like
> with IN_MEMORY StoreType as it is straight forward to use.
>
> Do you see a chance to get InteractiveQueryV2 work with GlobalKTable?
>
> Kind regards,
> Christian
>
> -----Original Message-----
> From: Sophie Blee-Goldman <sop...@responsive.dev>
> Sent: Wednesday, November 22, 2023 1:51 AM
> To: christian.zueg...@ams-osram.com.invalid
> Cc: users@kafka.apache.org
> Subject: Re: GlobalKTable with RocksDB - queries before state RUNNING?
>
> [Sie erhalten nicht häufig E-Mails von sop...@responsive.dev. Weitere
> Informationen, warum dies wichtig ist, finden Sie unter
> https://aka.ms/LearnAboutSenderIdentification ]
>
> Just to make sure I understand the logs, you're saying the "new file
> processed" lines represent store queries, and presumably the
> com.osr.serKafkaStreamsService is your service that's issuing these queries?
>
> You need to wait for the app to finish restoring state before querying it.
> Based on this message -- "KafkaStreams has not been started, you can retry
> after calling start()" -- I assume you're kicking off the querying service
> right away and blocking queries until after KafkaStreams#start is called.
> But you need to wait for it to actually finish starting up, not just for
> start() to be called. The best way to do this is by setting a state
> listener via KafkaStreams#setStateListener, and then using this to listen
> in on the KafkaStreams.State and blocking the queries until the state has
> changed to RUNNING.
>
> In case you're curious about why this seems to work with in-memory stores
> but not with rocksdb, it seems like in the in-memory case, the queries that
> are attempted during restoration are blocked due to the store being closed
> (according to "(Quarkus Main Thread) the state store, store-name, is not
> open.")
>
> So why is the store closed for most of the restoration in the in-memory
> case only? This gets a bit into the weeds, but it has to do with the
> sequence of events in starting up a state store. When the global thread
> starts up, it'll first loop over all its state stores and call #init on
> them. Two things have to happen inside #init: the store is opened, and the
> store registers itself with the ProcessorContext. The #register involves
> various things, including a call to fetch the end offsets of the topic for
> global state stores. This is a blocking call, so the store might stay
> inside the #register call for a relatively long while.
>
> For RocksDB stores, we open the store first and then call #register, so by
> the time the GlobalStreamThread is sitting around waiting on the end
> offsets response, the store is open and your queries are getting through to
> it. However the in-memory store actually registers itself *first*, before
> marking itself as open, and so it remains closed for most of the time it
> spends in restoration and blocks any query attempts during this time.
>
> I suppose it would make sense to align the two store implementations to
> have the same behavior, and the in-memory store is probably technically
> more correct. But in the end you really should just wait for the
> KafkaStreams.State to get to RUNNING before querying the state store, as
> that's the only true guarantee.
>
> Hope this helps!
>
> -Sophie
>
> On Tue, Nov 21, 2023 at 6:44 AM Christian Zuegner
> <christian.zueg...@ams-osram.com.invalid> wrote:
>
> > Hi,
> >
> > we have the following problem - a Kafka Topic ~20Megabytes is made
> > available as GlobalKTable for queries. With using RocksDB the GKTable
> > is ready for queries instantly even without having reading the data
> > complete - all get() requests return null. After a few seconds the
> > data is querieable correctly - but this is to late for our
> > application. Once we switch to IN_MEMORY we get the expected behavior.
> > The store is only ready after all data has been read from topic.
> >
> > How can we achieve the same behavior with the RocksDB setup?
> >
> > Snipet to build KafkaStreams Topology
> >
> > builder.globalTable(
> >   "topic-name",
> >   Consumed.with(Serdes.String(), Serdes.String()),
> >
> > Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS
> > _DB)
> > );
> >
> > Query the Table
> >
> > while (true) {
> >             try {
> >                 return streams.store(
> >
> > StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.
> > STORE_NAME, QueryableStoreTypes.keyValueStore()));
> >             } catch (InvalidStateStoreException e) {
> >                 logger.warn(e.getMessage());
> >                 try {
> >                     Thread.sleep(3000);
> >                 } catch (InterruptedException ignored) {
> >                 }
> >             }
> >         }
> >
> > The store is queried with getStore().get(key); <- here we get the null
> > values.
> >
> > This is the Log Output when RocksDB - first query before state RUNNING
> >
> > ...
> > 2023-11-21 15:15:40,629 INFO  [com.osr.serKafkaStreamsService]
> > (Quarkus Main Thread) wait for kafka streams store to get ready:
> > KafkaStreams has not been started, you can retry after calling start()
> > 2023-11-21 15:15:41,781 INFO  [org.apa.kaf.str.KafkaStreams]
> > (pool-10-thread-1) stream-client
> > [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State transition
> > from CREATED to REBALANCING
> > 2023-11-21 15:15:41,819 INFO
> > [org.apa.kaf.str.sta.int.RocksDBTimestampedStore]
> > (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> > Opening store store-name in regular mode
> > 2023-11-21 15:15:41,825 INFO
> > [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
> > (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> > global-stream-thread
> > [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread]
> > Restoring state for global store store-name
> > 2023-11-21 15:15:43,753 INFO  [io.quarkus] (Quarkus Main Thread) demo
> > 1.0-SNAPSHOT on JVM (powered by Quarkus 3.2.8.Final) started in 5.874s.
> > 2023-11-21 15:15:43,754 INFO  [io.quarkus] (Quarkus Main Thread)
> > Profile dev activated. Live Coding activated.
> > 2023-11-21 15:15:43,756 INFO  [io.quarkus] (Quarkus Main Thread)
> > Installed
> > features: [apicurio-registry-avro, cdi, config-yaml, kafka-client,
> > kafka-streams, logging-gelf, smallrye-context-propagation,
> > smallrye-fault-tolerance, smallrye-reactive-messaging,
> > smallrye-reactive-messaging-kafka, vertx]
> > 2023-11-21 15:15:44,195 INFO  [com.osr.ser.KafkaStreamsService]
> > (vert.x-worker-thread-1) new file processed
> > 2023-11-21 15:15:44,629 INFO
> > [org.apa.kaf.str.pro.int.GlobalStreamThread]
> > (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> > global-stream-thread
> > [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread]
> > State transition from CREATED to RUNNING
> > 2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
> > (topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2-GlobalStreamThread)
> > stream-client [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State
> > transition from REBALANCING to RUNNING
> > 2023-11-21 15:15:44,631 INFO  [org.apa.kaf.str.KafkaStreams]
> > (pool-10-thread-1) stream-client
> > [topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] Started 0 stream
> > threads ...
> >
> > Once I configure with StoreType.IN_MEMORY no queries are done before
> > the state is RUNNING
> >
> > 2023-11-21 15:28:25,511 WARN  [com.osr.serKafkaStreamsService]
> > (Quarkus Main Thread) KafkaStreams has not been started, you can retry
> > after calling
> > start()
> > 2023-11-21 15:28:26,730 INFO  [org.apa.kaf.str.KafkaStreams]
> > (pool-10-thread-1) stream-client
> > [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State transition
> > from CREATED to REBALANCING
> > 2023-11-21 15:28:26,752 INFO
> > [org.apa.kaf.str.pro.int.GlobalStateManagerImpl]
> > (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> > global-stream-thread
> > [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread]
> > Restoring state for global store store-name
> > 2023-11-21 15:28:29,834 WARN  [com.osr.serKafkaStreamsService]
> > (Quarkus Main Thread) the state store, store-name, is not open.
> > 2023-11-21 15:28:33,670 WARN  [com.osr.serKafkaStreamsService]
> > (Quarkus Main Thread) the state store, store-name, is not open.
> > 2023-11-21 15:28:33,763 INFO
> > [org.apa.kaf.str.pro.int.GlobalStreamThread]
> > (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> > global-stream-thread
> > [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread]
> > State transition from CREATED to RUNNING
> > 2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
> > (topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7-GlobalStreamThread)
> > stream-client [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] State
> > transition from REBALANCING to RUNNING
> > 2023-11-21 15:28:33,765 INFO  [org.apa.kaf.str.KafkaStreams]
> > (pool-10-thread-1) stream-client
> > [topic-name-e459f74c-cd36-4595-a8c6-bd0aad9ae0a7] Started 0 stream
> > threads
> > 2023-11-21 15:28:36,774 INFO  [com.osr.serKafkaStreamsService]
> > (vert.x-worker-thread-1) new file processed
> >
> >
> > Thanks for any input!
> > Christian
> >
> >
> >
> >
>

Reply via email to