Hi guys,
I try to get more familar with Kafka. So I made a small spring boot
application.
I have a simple Kafka with topic "stock" where I put a productId as key and
an Integer as stock change into it. e.g. {10001, -5};{10001, 2}.
Now I want to create a materialized view to get the current stock level of
a specific product:
@Bean
public KafkaStreams stockStreams() {
StreamsConfig config = stockStreamsConfig();
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<Long, Integer> stockStream = streamsBuilder.stream(STOCK_TOPIC);
stockStream.groupByKey().reduce((value1, value2) -> value1 +
value2, Materialized.as(STOCK_STORE));
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), config);
streams.start();
return streams;
}
The problem is when i try to acces the store:
stockStreams.store(STOCK_STORE, QueryableStoreTypes.keyValueStore())
it's not available anymore:
org.apache.kafka.streams.errors.InvalidStateStoreException: the state
store, stockStore, may have migrated to another instance.
at
org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60)
~[kafka-streams-1.0.2.jar:na]
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1043)
~[kafka-streams-1.0.2.jar:na]
If I make a count instead of reduce I can access the materialized view (but
of course with the wrong stock level)
Has anyone an advice?
best regards
Toni