You can disable caching by setting cache size to zero.


On 5/3/17 4:07 PM, Steven Schlansker wrote:
> I'm designing a Streams application that provides an API that acts
> on messages.  Messages have a sender.
> I have a KStream<Id, Message> and a KTable<SenderId, Sender>
> The first time a message is sent, you need to ensure the sender
> exists beforehand.  Roughly,
> void send(Message m) {
>     if (senderTable.get(m.getSenderId())) {
>         senderTopic.put(senderId, createSender(m));
>     }
>     messageTopic.produce(m.getId(), m);
> }
> Unfortunately, there is no ordering guarantee between these topics --
> it is entirely possible for message processing to happen before the sender
> is created in the KTable.
> I'm trying to work around this, by essentially keeping a 
> CompletableFuture<Sender>
> and doing:
> senderTable.foreach((senderId, sender) -> 
> awaiters.get(senderId).complete(sender)));
> void send(Message m) {
>     if (senderTable.get(m.getSenderId()) == null) {
>         senderTopic.put(senderId, createSender(m));
>         awaiters.put(id, sender -> realSend(sender, m));
>     } else {
>         realSend(sender, m);
>     }
> }
> void realSend(MessageSender s, Message m) {
>     messageTopic.produce(m.getId(), m);
> }
> Unfortunately, it seems that the KTable is wired up in a way that makes this
> nearly impossible to implement well -- it unconditionally enables a
> CachingKeyValueStore, which seems to only actually emit updates (and trigger 
> foreach)
> in batch once per commit interval.  This is terrible for the real-time 
> behavior I expect.
> Clearly I'm thinking about this problem wrong, but I'm not really sure what 
> the most effective
> route is to fix my design.  If the KTable was lower latency (disable 
> caching?) the window of
> sadness gets much much smaller -- but fundamentally it seems very difficult 
> to track down
> given a produced message M at what point you can guarantee that processor P 
> has observed message M
> to enforce any sort of ordering guarantees.
> Anybody else wrestling with problems like this and have thoughts?  Thanks in 
> advance.

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to