Hi Chris,

*"if I understand your problem correctly, the issue is that you need
todecrement the stock count when you reserve it, rather than splitting it*
*into a second phase."*

That's exactly the problem, I would need to:

1) Read the OrderPlaced event from Kafka in ProductService...
2) ...query the ProductsStock store to check availability...
3) ...update the Product in the same phase (OrderPlacedEvent processing)
4) ...publish a ProductReserved message

        // 1) Read the OrderPlaced event...
public void handleOrder(Map<String, Object> event){
logger.info("Event {}", event);
Order order = new Order();
                        // 2) Query the ProductsStockStore...
                        Integer productStock =
        if(productStock != null && productStock > 0) {
                            // 3) Update the ProductsStockStore

                            // 4) Publish a new message. No problem here


public Integer getProductStock(Integer id) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
   streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(id);

However the only way I know of updating the store is publishing a new event
ProductReserved that will be processed by the KStream as a separated step
(new Kafka message):

    Map<String, Object> event = new HashMap<>();
    event.put("name", "ProductReserved");
    event.put("orderId", order.getId());
    event.put("productId", order.getProductId());
                            event.put("quantity", -1);
                            // 3) Update the ProductStore

This is the separated KStream config notice // 3) where the update takes

public class KStreamsConfig {

public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

    Serde<Integer> integerSerde = Serdes.Integer();
            final Serializer<JsonNode> jsonSerializer = new
            final Deserializer<JsonNode> jsonDeserializer = new
            final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
   KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
jsonSerde, "orders");

            // 3) Update the ProductStore
            stream.filter( (key, value) -> value != null &&
.map( (key, value) -> {
    return new KeyValue<>(value.get("productId").asInt(),
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
   return stream;

I've had a look at the StateStoresInTheDSLIntegrationTest.java
I still don't get how to integrate the update step in // 2). No idea how I
can do all this in the same phase:

- Consume a message
- Query a KStreams store
- Update the KStreams store
- Publish a ProductReserved message.

Could you please outline the necessary code to do it?

Thank you so much.

Reply via email to