Hi Ben, now I can see what you meant previously about using a Transformer. I was following a wrong approach dividing the processing between a Listener and a Stream processor.
There's only one thing left that I don't know how to work out, this a draft of my code based on yours: @Bean @SuppressWarnings("unchecked") public KStream<?, ?> kStream2(KStreamBuilder builder, KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { final Serde<Integer> integerSerde = Serdes.Integer(); final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); KStream<String, JsonNode> unvalidatedOrdersStream = builder.stream(ORDERS_TOPIC); KStream<String, JsonNode> stockStream = builder.stream(PRODUCTS_TOPIC); StateStoreSupplier<StateStore> productStore = Stores.create(PRODUCTS_STORE) .withKeys(integerSerde) .withValues(jsonSerde) .persistent() .build(); builder.addStateStore(productStore); ValueJoiner<JsonNode, JsonNode, Map<String, String>> valueJoiner = (JsonNode value1, JsonNode value2) -> new HashMap<>(); stockStream.branch(predicates) KStream<String, Map<String, String>> orderOutputs = unvalidatedOrdersStream.<JsonNode, Map<String, String>>outerJoin(stockStream, valueJoiner, JoinWindows.of(1000)); orderOutputs.<String, Map<String, String>>transform(() -> new StockCountTransformer(), PRODUCTS_STORE) .filter((key, value) -> { return value != null; }).to(ORDERS_TOPIC); return orderOutputs; } There are two ways of updating the product store: - ProductService has a REST endpoint that publishes ProductAdded events to product topic - OrderService sends a OrderPlaced event to the orders topic. The problem now is that, if I understand it right, in order to update the PRODUCTS_STORE there must be a join of an OrderPlaced event and a ProductAdded event *in a certain join window*. If there aren't Order and Product events that happen within a time window nothing will be updated in the store. What's more, ProductService shoud be able to update its store without having anything to do with the orders, shouldn't it? I have tried publishing ProductAdded events and nothing happens. Could you give me a hint about how to deal with this? Thanks again for your time!! 2017-07-24 15:23 GMT+02:00 Ben Stopford <b...@confluent.io>: > No worries Jose ;-) > > So there are a few ways you could do this, but I think it’s important that > you manage a single “stock level” state store, backed by a changelog. Use > this for validation, and keep it up to date at the same time. You should > also ensure the input topic(s) are partitioned by productId so any update > to, or validation of, the same product will be sequenced. This effectively > ensures the mutations of the quantities in stock will be atomic. > > So say we have two inputs: OrderRequests, StockUpdates > > Order requests need to validate that there is sufficient stock, via the > product store, then decrement the stock value in that store: > > public Event validateInventory(OrderRequestEvent order, KeyValueStore<> > store){ > > Long stockCount = store.get(order.product); > > if (stockCount - order.quantity >= 0) { > > //decrement the value in the store > > store.put(order.product, stockCount - order.amount); > > return new OrderValidatedEvent(Validation.Passed); > > } else > > return new OrderValidatedEvent(Validation.Failed); > > } > > Stock updates need to increase the stock value in the product store as new > stock arrives. > > public void updateStockStore(StockUpdateEvent update, KeyValueStore<> > store){ > > Long current = update.get(update.product); > > store.put(update.product, current + update.amount); > > } > > To do the processing we merge input streams, then push this into a > transfomer, that uses a single state store to manage the mapping between > products and their stock levels. > > KStream<byte[], String> unvalidatedOrdersStream = > builder.stream(orderTopic); > > KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic); > > StateStoreSupplier productStore = Stores.create( > productStoreName)...build() > > KStream<byte[], String> orderOutputs = > > unvalidatedOrdersStream.outerJoin(stockStream, ...) > > .transform(StockCheckTransformer::new, productStoreName) > > .filter((key, value) -> value != ""); > > orderOutputs.to(validatedOrdersStream); > > > With the transformer both managing and validating against the stock levels. > > StockCountTransformer { …. > > public KeyValue<byte[], Event> transform(ProductId key, Event event) > > if (event.isStockUpdate()) { > > Stock update = parseStock(value); > > return KeyValue.pair(key, > > updateStockStore(parseStockUpdate(event), productStore)) > > } else if (event.isOrderRequest()) { > > return KeyValue.pair(key, > > validateInventory(parseOrderReq(event), productStore)) > > } > > } > > } > > Now the stock levels will be held in the changelog topic which backs the > ProductStore which we can reuse if we wish. > > I think we could also optimise this code a bit by splitting into two > transformers via streams.branch(..). > > Regarding EoS. This doesn’t add any magic to your processing logic. It just > guarantees that your stock count will be accurate in the face of failure > (i.e. you don’t need to manage idempotence yourself). > > B > > > On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo < > joseantonio.in...@gmail.com> wrote: > > > Hi Garret, > > > > At the moment, to simplify the problem I only have one topic, orders, > where > > I add products and decrement them based on ProductAdded and > ProductReserved > > events. > > > > Yeaterday I was reading about EoS but I don't know if it'll solve the > > problem. Dividing the query-update in two steps means that the event > > ordering could be: > > > > OrderPlaced (query stock ok) > > OrderPlaced (query stock ok) > > ProductReserved (update stock) > > ProductReserved (update stock) > > > > Regarding EoS this sequence is correct, the messages are delivered once > in > > the order in which they were generated. The problem is the order itself: > if > > there were a way to query-update-store-generate-event in one step to > > produce instead the following sequence of events there wouldn't be any > > problem: > > > > OrderPlaced->ProductReserved (query stock ok + Update stock store + > > reserved event) > > OrderPlaced->ProductNoStock (query stock fail so no update and > out-of-stock > > event) > > > > Regards > > > > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <garrett.bar...@gmail.com> > > wrote: > > > > > Could you take in both topics via the same stream? Meaning don't do a > > kafka > > > streams join, literally just read both streams. If KStream cant do > this, > > > dunno haven't tried, then simple upstream merge job to throw them into > 1 > > > topic with same partitioning scheme. > > > > > > I'd assume you would have the products stream that would be some kind > of > > > incrementer on state (within the local state store). The Orders stream > > > would act as a decrement to the same stream task. Exactly once > semantics > > > and you skirt the issue of having to wait for the update to come back > > > around. > > > > > > Thoughts? > > > > > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo < > > > joseantonio.in...@gmail.com> wrote: > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > *"if I understand your problem correctly, the issue is that you need > > > > todecrement the stock count when you reserve it, rather than > splitting > > > it* > > > > *into a second phase."* > > > > > > > > That's exactly the problem, I would need to: > > > > > > > > 1) Read the OrderPlaced event from Kafka in ProductService... > > > > 2) ...query the ProductsStock store to check availability... > > > > 3) ...update the Product in the same phase (OrderPlacedEvent > > processing) > > > > 4) ...publish a ProductReserved message > > > > > > > > // 1) Read the OrderPlaced event... > > > > @StreamListener(OrderProcessor.INPUT) > > > > public void handleOrder(Map<String, Object> event){ > > > > logger.info("Event {}", event); > > > > if("OrderPlaced".equals(event.get("name"))){ > > > > Order order = new Order(); > > > > order.setId((String)event.get("orderId")); > > > > order.setProductId((Integer)(event.get("productId"))); > > > > order.setUid(event.get("uid").toString()); > > > > ... > > > > // 2) Query the ProductsStockStore... > > > > Integer productStock = > > > > getProductStock(order.getProductId()); > > > > if(productStock != null && productStock > 0) { > > > > // 3) Update the ProductsStockStore > > > > ??? > > > > > > > > // 4) Publish a new message. No problem > > here > > > > > > > > } > > > > > > > > @Override > > > > public Integer getProductStock(Integer id) { > > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams(); > > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore = > > > > streams.store("ProductsStock", QueryableStoreTypes. > keyValueStore()); > > > > return keyValueStore.get(id); > > > > } > > > > > > > > However the only way I know of updating the store is publishing a new > > > event > > > > ProductReserved that will be processed by the KStream as a separated > > step > > > > (new Kafka message): > > > > > > > > Map<String, Object> event = new HashMap<>(); > > > > event.put("name", "ProductReserved"); > > > > event.put("orderId", order.getId()); > > > > event.put("productId", order.getProductId()); > > > > event.put("quantity", -1); > > > > // 3) Update the ProductStore > > > > orderProcessor.output().send(MessageBuilder.withPayload( > > > > event).build(), > > > > 500); > > > > > > > > This is the separated KStream config notice // 3) where the update > > takes > > > > place: > > > > > > > > @Configuration > > > > public class KStreamsConfig { > > > > > > > > @Bean > > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder, > > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) { > > > > > > > > Serde<Integer> integerSerde = Serdes.Integer(); > > > > final Serializer<JsonNode> jsonSerializer = new > > > > JsonSerializer(); > > > > final Deserializer<JsonNode> jsonDeserializer = new > > > > JsonDeserializer(); > > > > final Serde<JsonNode> jsonSerde = > > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer); > > > > KStream<Integer, JsonNode> stream = > > > kStreamBuilder.stream(integerSerde, > > > > jsonSerde, "orders"); > > > > > > > > // 3) Update the ProductStore > > > > stream.filter( (key, value) -> value != null && > > > > value.get("name").asText().equals("ProductReserved")) > > > > .map( (key, value) -> { > > > > return new KeyValue<>(value.get("productId").asInt(), > > > > value.get("quantity").asInt()); > > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock"); > > > > return stream; > > > > } > > > > } > > > > > > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java > > > > <https://github.com/confluentinc/examples/blob/ > > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/ > > > > StateStoresInTheDSLIntegrationTest.java> > > > > but > > > > I still don't get how to integrate the update step in // 2). No idea > > how > > > I > > > > can do all this in the same phase: > > > > > > > > - Consume a message > > > > - Query a KStreams store > > > > - Update the KStreams store > > > > - Publish a ProductReserved message. > > > > > > > > Could you please outline the necessary code to do it? > > > > > > > > Thank you so much. > > > > Jose > > > > > > > > > >