My previous mail was in fact addressed to Ben, not Chris, sorry for the
mistake.

Regards

On Sat, 22 Jul 2017 at 00:15, José Antonio Iñigo <
joseantonio.in...@gmail.com> wrote:

> Hi Chris,
>
> *"if I understand your problem correctly, the issue is that you need to*
>
>
> *decrement the stock count when you reserve it, rather than splitting it*
> *into a second phase."*
>
> That's exactly the problem, I would need to:
>
> 1) Read the OrderPlaced event from Kafka in ProductService...
> 2) ...query the ProductsStock store to check availability...
> 3) ...update the Product in the same phase (OrderPlacedEvent processing)
> 4) ...publish a ProductReserved message
>
>         // 1) Read the OrderPlaced event...
> @StreamListener(OrderProcessor.INPUT)
> public void handleOrder(Map<String, Object> event){
> logger.info("Event {}", event);
> if("OrderPlaced".equals(event.get("name"))){
> Order order = new Order();
> order.setId((String)event.get("orderId"));
> order.setProductId((Integer)(event.get("productId")));
> order.setUid(event.get("uid").toString());
> ...
>                         // 2) Query the ProductsStockStore...
>                         Integer productStock =
> getProductStock(order.getProductId());
>         if(productStock != null && productStock > 0) {
>                             // 3) Update the ProductsStockStore
>     ???
>
>                             // 4) Publish a new message. No problem here
>
>         }
>
> @Override
> public Integer getProductStock(Integer id) {
> KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
>    streams.store("ProductsStock", QueryableStoreTypes.keyValueStore());
> return keyValueStore.get(id);
> }
>
> However the only way I know of updating the store is publishing a new
> event ProductReserved that will be processed by the KStream as a separated
> step (new Kafka message):
>
>     Map<String, Object> event = new HashMap<>();
>     event.put("name", "ProductReserved");
>     event.put("orderId", order.getId());
>     event.put("productId", order.getProductId());
>                             event.put("quantity", -1);
>                             // 3) Update the ProductStore
>
> orderProcessor.output().send(MessageBuilder.withPayload(event).build(),
> 500);
>
> This is the separated KStream config notice // 3) where the update takes
> place:
>
> @Configuration
> public class KStreamsConfig {
>
> @Bean
> public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
>
>     Serde<Integer> integerSerde = Serdes.Integer();
>             final Serializer<JsonNode> jsonSerializer = new
> JsonSerializer();
>             final Deserializer<JsonNode> jsonDeserializer = new
> JsonDeserializer();
>             final Serde<JsonNode> jsonSerde =
> Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
>    KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde,
> jsonSerde, "orders");
>
>             // 3) Update the ProductStore
>             stream.filter( (key, value) -> value != null &&
> value.get("name").asText().equals("ProductReserved"))
> .map( (key, value) -> {
>     return new KeyValue<>(value.get("productId").asInt(),
> value.get("quantity").asInt());
> }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
>    return stream;
> }
> }
>
> I've had a look at the StateStoresInTheDSLIntegrationTest.java
> <https://github.com/confluentinc/examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java>
>  but
> I still don't get how to integrate the update step in // 2). No idea how
> I can do all this in the same phase:
>
> - Consume a message
> - Query a KStreams store
> - Update the KStreams store
> - Publish a ProductReserved message.
>
> Could you please outline the necessary code to do it?
>
> Thank you so much.
> Jose
>

Reply via email to