Hi Francesco,
I tried that already. But the store will never get ready. I restartet my
application and and then the store will be ready instantly. I could
reproduce this.
Can it be that if I try to create a store without a message in my stream
the store will never become ready?
The configuration looks like this:
StreamsConfig config = stockStreamsConfig();
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Long, Long> stockStream = streamsBuilder.stream(STOCK_TOPIC);
stockStream.groupByKey()
.reduce((value1, value2) -> {
LOG.info("Reduce {} and {}", value1, value2);
return value1 + value2;
}, Materialized.as(STOCK_STORE));
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), config);
streams.start();
return streams;
So if I use reduce there must be some entries before the store is
created/will become ready?
Best regards
Toni
Francesco Frontera <[email protected]> schrieb am Di., 11.
Sep. 2018 um 14:10 Uhr:
> Hi Toni,
>
> it looks like that the local Kafka Streams instance is not yet read, for
> this reason, local state store can't be queried yet.
> you could try to use the methods explained in Kafka Streams documentation
> <https://docs.confluent.io/current/streams/faq.html#id28> for avoiding
> this
> problem:
>
> // Example: Wait until the store of type T is queryable. When it is,
> return a reference to the store.public static <T> T
> waitUntilStoreIsQueryable(final String storeName,
> final
> QueryableStoreType<T> queryableStoreType,
> final KafkaStreams
> streams) throws InterruptedException {
> while (true) {
> try {
> return streams.store(storeName, queryableStoreType);
> } catch (InvalidStateStoreException ignored) {
> // store not yet ready for querying
> Thread.sleep(100);
> }
> }}
>
> I hope I've been of some help.
>
> Best regards,
> *Francesco Frontera*
>
> *Software Engineer @ Radicalbit*
> *Via Borsieri 41, 20159, Milano
> <https://maps.google.com/?q=Via+Borsieri+41,+20159,+Milano&entry=gmail&source=g>
> - IT*
>
>
> Il giorno mar 11 set 2018 alle ore 13:49 Toni Zehnder
> <[email protected]> ha scritto:
>
> > Hi guys,
> > I try to get more familar with Kafka. So I made a small spring boot
> > application.
> > I have a simple Kafka with topic "stock" where I put a productId as key
> and
> > an Integer as stock change into it. e.g. {10001, -5};{10001, 2}.
> > Now I want to create a materialized view to get the current stock level
> of
> > a specific product:
> >
> > @Bean
> > public KafkaStreams stockStreams() {
> > StreamsConfig config = stockStreamsConfig();
> > StreamsBuilder streamsBuilder = new StreamsBuilder();
> >
> > KStream<Long, Integer> stockStream =
> > streamsBuilder.stream(STOCK_TOPIC);
> >
> > stockStream.groupByKey().reduce((value1, value2) -> value1 +
> > value2, Materialized.as(STOCK_STORE));
> > KafkaStreams streams = new KafkaStreams(streamsBuilder.build(),
> > config);
> >
> > streams.start();
> >
> > return streams;
> > }
> >
> > The problem is when i try to acces the store:
> >
> > stockStreams.store(STOCK_STORE, QueryableStoreTypes.keyValueStore())
> >
> > it's not available anymore:
> > org.apache.kafka.streams.errors.InvalidStateStoreException: the state
> > store, stockStore, may have migrated to another instance.
> > at
> >
> >
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
> > ~[kafka-streams-1.0.2.jar:na]
> > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
> > ~[kafka-streams-1.0.2.jar:na]
> >
> > If I make a count instead of reduce I can access the materialized view
> (but
> > of course with the wrong stock level)
> >
> > Has anyone an advice?
> >
> > best regards
> > Toni
> >
>