Hi Chris,


*"if I understand your problem correctly, the issue is that you need
todecrement the stock count when you reserve it, rather than splitting it*
*into a second phase."*

That's exactly the problem, I would need to:

1) Read the OrderPlaced event from Kafka in ProductService...
2) ...query the ProductsStock store to check availability...
3) ...update the Product in the same phase (OrderPlacedEvent processing)
4) ...publish a ProductReserved message

        // 1) Read the OrderPlaced event...
@StreamListener(OrderProcessor.INPUT)
public void handleOrder(Map<String, Object> event){
logger.info("Event {}", event);
if("OrderPlaced".equals(event.get("name"))){
Order order = new Order();
order.setId((String)event.get("orderId"));
order.setProductId((Integer)(event.get("productId")));
order.setUid(event.get("uid").toString());
...
                        // 2) Query the ProductsStockStore...
                        Integer productStock =
getProductStock(order.getProductId());
        if(productStock != null && productStock > 0) {
                            // 3) Update the ProductsStockStore
    ???

                            // 4) Publish a new message. No problem here

        }

@Override
public Integer getProductStock(Integer id) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
   streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(id);
}

However the only way I know of updating the store is publishing a new event
ProductReserved that will be processed by the KStream as a separated step
(new Kafka message):

    Map<String, Object> event = new HashMap<>();
    event.put("name", "ProductReserved");
    event.put("orderId", order.getId());
    event.put("productId", order.getProductId());
                            event.put("quantity", -1);
                            // 3) Update the ProductStore
    orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
500);

This is the separated KStream config notice // 3) where the update takes
place:

@Configuration
public class KStreamsConfig {

@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {

    Serde<Integer> integerSerde = Serdes.Integer();
            final Serializer<JsonNode> jsonSerializer = new
JsonSerializer();
            final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
            final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
   KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
jsonSerde, "orders");

            // 3) Update the ProductStore
            stream.filter( (key, value) -> value != null &&
value.get("name").asText().equals("ProductReserved"))
.map( (key, value) -> {
    return new KeyValue<>(value.get("productId").asInt(),
value.get("quantity").asInt());
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
   return stream;
}
}

I've had a look at the StateStoresInTheDSLIntegrationTest.java
<https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java>
but
I still don't get how to integrate the update step in // 2). No idea how I
can do all this in the same phase:

- Consume a message
- Query a KStreams store
- Update the KStreams store
- Publish a ProductReserved message.

Could you please outline the necessary code to do it?

Thank you so much.
Jose

Reply via email to