Hi Chris,
*"if I understand your problem correctly, the issue is that you need
todecrement the stock count when you reserve it, rather than splitting it*
*into a second phase."*
That's exactly the problem, I would need to:
1) Read the OrderPlaced event from Kafka in ProductService...
2) ...query the ProductsStock store to check availability...
3) ...update the Product in the same phase (OrderPlacedEvent processing)
4) ...publish a ProductReserved message
// 1) Read the OrderPlaced event...
@StreamListener(OrderProcessor.INPUT)
public void handleOrder(Map<String, Object> event){
logger.info("Event {}", event);
if("OrderPlaced".equals(event.get("name"))){
Order order = new Order();
order.setId((String)event.get("orderId"));
order.setProductId((Integer)(event.get("productId")));
order.setUid(event.get("uid").toString());
...
// 2) Query the ProductsStockStore...
Integer productStock =
getProductStock(order.getProductId());
if(productStock != null && productStock > 0) {
// 3) Update the ProductsStockStore
???
// 4) Publish a new message. No problem here
}
@Override
public Integer getProductStock(Integer id) {
KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
return keyValueStore.get(id);
}
However the only way I know of updating the store is publishing a new event
ProductReserved that will be processed by the KStream as a separated step
(new Kafka message):
Map<String, Object> event = new HashMap<>();
event.put("name", "ProductReserved");
event.put("orderId", order.getId());
event.put("productId", order.getProductId());
event.put("quantity", -1);
// 3) Update the ProductStore
orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
500);
This is the separated KStream config notice // 3) where the update takes
place:
@Configuration
public class KStreamsConfig {
@Bean
public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new
JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
final Serde<JsonNode> jsonSerde =
Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
jsonSerde, "orders");
// 3) Update the ProductStore
stream.filter( (key, value) -> value != null &&
value.get("name").asText().equals("ProductReserved"))
.map( (key, value) -> {
return new KeyValue<>(value.get("productId").asInt(),
value.get("quantity").asInt());
}).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
return stream;
}
}
I've had a look at the StateStoresInTheDSLIntegrationTest.java
<https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java>
but
I still don't get how to integrate the update step in // 2). No idea how I
can do all this in the same phase:
- Consume a message
- Query a KStreams store
- Update the KStreams store
- Publish a ProductReserved message.
Could you please outline the necessary code to do it?
Thank you so much.
Jose