Hi Ben,

now I can see what you meant previously about using a Transformer. I was
following a wrong approach dividing the processing between a Listener and a
Stream processor.

There's only one thing left that I don't know how to work out, this a draft
of my code based on yours:

@Bean
@SuppressWarnings("unchecked")
public KStream<?, ?> kStream2(KStreamBuilder builder,
KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
final Serde<Integer> integerSerde = Serdes.Integer();
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new
JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer,
jsonDeserializer);
KStream<String, JsonNode> unvalidatedOrdersStream =
builder.stream(ORDERS_TOPIC);
KStream<String, JsonNode> stockStream = builder.stream(PRODUCTS_TOPIC);

StateStoreSupplier<StateStore> productStore = Stores.create(PRODUCTS_STORE)
.withKeys(integerSerde)
.withValues(jsonSerde)
.persistent()
.build();
builder.addStateStore(productStore);
ValueJoiner<JsonNode, JsonNode, Map<String, String>> valueJoiner =
(JsonNode value1, JsonNode value2) -> new HashMap<>();
stockStream.branch(predicates)
KStream<String, Map<String, String>> orderOutputs =
unvalidatedOrdersStream.<JsonNode, Map<String,
String>>outerJoin(stockStream, valueJoiner,  JoinWindows.of(1000));
orderOutputs.<String, Map<String, String>>transform(() -> new
StockCountTransformer(), PRODUCTS_STORE)
.filter((key, value) -> {
return value != null;
}).to(ORDERS_TOPIC);

return orderOutputs;
}

There are two ways of updating the product store:
- ProductService has a REST endpoint that publishes ProductAdded events to
product topic
- OrderService sends a OrderPlaced event to the orders topic.

The problem now is that, if I understand it right, in order to update the
PRODUCTS_STORE there must be a join of an OrderPlaced event and a
ProductAdded event *in a certain join window*. If there aren't Order and
Product events that happen within a time window nothing will be updated in
the store. What's more, ProductService shoud be able to update its store
without having anything to do with the orders, shouldn't it? I have tried
publishing ProductAdded events and nothing happens. Could you give me a
hint about how to deal with this?

Thanks again for your time!!

2017-07-24 15:23 GMT+02:00 Ben Stopford <b...@confluent.io>:

> No worries Jose ;-)
>
> So there are a few ways you could do this, but I think it’s important that
> you manage a single “stock level” state store, backed by a changelog. Use
> this for validation, and keep it up to date at the same time. You should
> also ensure the input topic(s) are partitioned by productId so any update
> to, or validation of, the same product will be sequenced. This effectively
> ensures the mutations of the quantities in stock will be atomic.
>
> So say we have two inputs: OrderRequests, StockUpdates
>
> Order requests need to validate that there is sufficient stock, via the
> product store, then decrement the stock value in that store:
>
> public Event validateInventory(OrderRequestEvent order, KeyValueStore<>
> store){
>
> Long stockCount = store.get(order.product);
>
> if (stockCount - order.quantity >= 0) {
>
> //decrement the value in the store
>
> store.put(order.product, stockCount - order.amount);
>
> return new OrderValidatedEvent(Validation.Passed);
>
> } else
>
>            return new OrderValidatedEvent(Validation.Failed);
>
> }
>
> Stock updates need to increase the stock value in the product store as new
> stock arrives.
>
> public void updateStockStore(StockUpdateEvent update, KeyValueStore<>
> store){
>
> Long current = update.get(update.product);
>
> store.put(update.product, current + update.amount);
>
> }
>
> To do the processing we merge input streams, then push this into a
> transfomer, that uses a single state store to manage the mapping between
> products and their stock levels.
>
> KStream<byte[], String> unvalidatedOrdersStream =
> builder.stream(orderTopic);
>
> KStream<byte[], String> stockStream = builder.stream(stockUpdateTopic);
>
> StateStoreSupplier productStore = Stores.create(
> productStoreName)...build()
>
> KStream<byte[], String> orderOutputs =
>
> unvalidatedOrdersStream.outerJoin(stockStream, ...)
>
> .transform(StockCheckTransformer::new, productStoreName)
>
> .filter((key, value) -> value != "");
>
> orderOutputs.to(validatedOrdersStream);
>
>
> With the transformer both managing and validating against the stock levels.
>
> StockCountTransformer { ….
>
> public KeyValue<byte[], Event> transform(ProductId key, Event event)
>
> if (event.isStockUpdate()) {
>
>                 Stock update = parseStock(value);
>
>                 return KeyValue.pair(key,
>
> updateStockStore(parseStockUpdate(event), productStore))
>
>   } else if (event.isOrderRequest()) {
>
>                 return KeyValue.pair(key,
>
> validateInventory(parseOrderReq(event), productStore))
>
>             }
>
> }
>
> }
>
> Now the stock levels will be held in the changelog topic which backs the
> ProductStore which we can reuse if we wish.
>
> I think we could also optimise this code a bit by splitting into two
> transformers via streams.branch(..).
>
> Regarding EoS. This doesn’t add any magic to your processing logic. It just
> guarantees that your stock count will be accurate in the face of failure
> (i.e. you don’t need to manage idempotence yourself).
>
> B
>
>
> On Sat, Jul 22, 2017 at 12:52 PM José Antonio Iñigo <
> joseantonio.in...@gmail.com> wrote:
>
> > Hi Garret,
> >
> > At the moment, to simplify the problem I only have one topic, orders,
> where
> > I add products and decrement them based on ProductAdded and
> ProductReserved
> > events.
> >
> > Yeaterday I was reading about EoS but I don't know if it'll solve the
> > problem. Dividing the query-update in two steps means that the event
> > ordering could be:
> >
> > OrderPlaced (query stock ok)
> > OrderPlaced (query stock ok)
> > ProductReserved (update stock)
> > ProductReserved (update stock)
> >
> > Regarding EoS this sequence is correct, the messages are delivered once
> in
> > the order in which they were generated. The problem is the order itself:
> if
> > there were a way to query-update-store-generate-event in one step to
> > produce instead the following sequence of events there wouldn't be any
> > problem:
> >
> > OrderPlaced->ProductReserved (query stock ok + Update stock store +
> > reserved event)
> > OrderPlaced->ProductNoStock (query stock fail so no update and
> out-of-stock
> > event)
> >
> > Regards
> >
> > On Sat, 22 Jul 2017 at 05:35, Garrett Barton <garrett.bar...@gmail.com>
> > wrote:
> >
> > > Could you take in both topics via the same stream? Meaning don't do a
> > kafka
> > > streams join, literally just read both streams. If KStream cant do
> this,
> > > dunno haven't tried, then simple upstream merge job to throw them into
> 1
> > > topic with same partitioning scheme.
> > >
> > > I'd assume you would have the products stream that would be some kind
> of
> > > incrementer on state (within the local state store).  The Orders stream
> > > would act as a decrement to the same stream task.  Exactly once
> semantics
> > > and you skirt the issue of having to wait for the update to come back
> > > around.
> > >
> > > Thoughts?
> > >
> > > On Fri, Jul 21, 2017 at 6:15 PM, José Antonio Iñigo <
> > > joseantonio.in...@gmail.com> wrote:
> > >
> > > > Hi Chris,
> > > >
> > > >
> > > >
> > > > *"if I understand your problem correctly, the issue is that you need
> > > > todecrement the stock count when you reserve it, rather than
> splitting
> > > it*
> > > > *into a second phase."*
> > > >
> > > > That's exactly the problem, I would need to:
> > > >
> > > > 1) Read the OrderPlaced event from Kafka in ProductService...
> > > > 2) ...query the ProductsStock store to check availability...
> > > > 3) ...update the Product in the same phase (OrderPlacedEvent
> > processing)
> > > > 4) ...publish a ProductReserved message
> > > >
> > > >         // 1) Read the OrderPlaced event...
> > > > @StreamListener(OrderProcessor.INPUT)
> > > > public void handleOrder(Map<String, Object> event){
> > > > logger.info("Event {}", event);
> > > > if("OrderPlaced".equals(event.get("name"))){
> > > > Order order = new Order();
> > > > order.setId((String)event.get("orderId"));
> > > > order.setProductId((Integer)(event.get("productId")));
> > > > order.setUid(event.get("uid").toString());
> > > > ...
> > > >                         // 2) Query the ProductsStockStore...
> > > >                         Integer productStock =
> > > > getProductStock(order.getProductId());
> > > >         if(productStock != null && productStock > 0) {
> > > >                             // 3) Update the ProductsStockStore
> > > >     ???
> > > >
> > > >                             // 4) Publish a new message. No problem
> > here
> > > >
> > > >         }
> > > >
> > > > @Override
> > > > public Integer getProductStock(Integer id) {
> > > > KafkaStreams streams = kStreamBuilderFactoryBean.getKafkaStreams();
> > > > ReadOnlyKeyValueStore<Integer, Integer> keyValueStore =
> > > >    streams.store("ProductsStock", QueryableStoreTypes.
> keyValueStore());
> > > > return keyValueStore.get(id);
> > > > }
> > > >
> > > > However the only way I know of updating the store is publishing a new
> > > event
> > > > ProductReserved that will be processed by the KStream as a separated
> > step
> > > > (new Kafka message):
> > > >
> > > >     Map<String, Object> event = new HashMap<>();
> > > >     event.put("name", "ProductReserved");
> > > >     event.put("orderId", order.getId());
> > > >     event.put("productId", order.getProductId());
> > > >                             event.put("quantity", -1);
> > > >                             // 3) Update the ProductStore
> > > >     orderProcessor.output().send(MessageBuilder.withPayload(
> > > > event).build(),
> > > > 500);
> > > >
> > > > This is the separated KStream config notice // 3) where the update
> > takes
> > > > place:
> > > >
> > > > @Configuration
> > > > public class KStreamsConfig {
> > > >
> > > > @Bean
> > > > public KStream<?, ?> kStream(KStreamBuilder kStreamBuilder,
> > > > KStreamBuilderFactoryBean kStreamBuilderFactoryBean) {
> > > >
> > > >     Serde<Integer> integerSerde = Serdes.Integer();
> > > >             final Serializer<JsonNode> jsonSerializer = new
> > > > JsonSerializer();
> > > >             final Deserializer<JsonNode> jsonDeserializer = new
> > > > JsonDeserializer();
> > > >             final Serde<JsonNode> jsonSerde =
> > > > Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
> > > >    KStream<Integer, JsonNode> stream =
> > > kStreamBuilder.stream(integerSerde,
> > > > jsonSerde, "orders");
> > > >
> > > >             // 3) Update the ProductStore
> > > >             stream.filter( (key, value) -> value != null &&
> > > > value.get("name").asText().equals("ProductReserved"))
> > > > .map( (key, value) -> {
> > > >     return new KeyValue<>(value.get("productId").asInt(),
> > > > value.get("quantity").asInt());
> > > > }).groupByKey().reduce( (v1, v2) -> v1 + v2, "ProductsStock");
> > > >    return stream;
> > > > }
> > > > }
> > > >
> > > > I've had a look at the StateStoresInTheDSLIntegrationTest.java
> > > > <https://github.com/confluentinc/examples/blob/
> > > > master/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > > StateStoresInTheDSLIntegrationTest.java>
> > > > but
> > > > I still don't get how to integrate the update step in // 2). No idea
> > how
> > > I
> > > > can do all this in the same phase:
> > > >
> > > > - Consume a message
> > > > - Query a KStreams store
> > > > - Update the KStreams store
> > > > - Publish a ProductReserved message.
> > > >
> > > > Could you please outline the necessary code to do it?
> > > >
> > > > Thank you so much.
> > > > Jose
> > > >
> > >
> >
>

Reply via email to