You can disable caching by setting cache size to zero. http://docs.confluent.io/current/streams/developer-guide.html#memory-management
-Matthias On 5/3/17 4:07 PM, Steven Schlansker wrote: > I'm designing a Streams application that provides an API that acts > on messages. Messages have a sender. > > I have a KStream<Id, Message> and a KTable<SenderId, Sender> > The first time a message is sent, you need to ensure the sender > exists beforehand. Roughly, > > void send(Message m) { > if (senderTable.get(m.getSenderId())) { > senderTopic.put(senderId, createSender(m)); > } > messageTopic.produce(m.getId(), m); > } > > Unfortunately, there is no ordering guarantee between these topics -- > it is entirely possible for message processing to happen before the sender > is created in the KTable. > > I'm trying to work around this, by essentially keeping a > CompletableFuture<Sender> > and doing: > > senderTable.foreach((senderId, sender) -> > awaiters.get(senderId).complete(sender))); > > void send(Message m) { > if (senderTable.get(m.getSenderId()) == null) { > senderTopic.put(senderId, createSender(m)); > awaiters.put(id, sender -> realSend(sender, m)); > } else { > realSend(sender, m); > } > } > > void realSend(MessageSender s, Message m) { > messageTopic.produce(m.getId(), m); > } > > Unfortunately, it seems that the KTable is wired up in a way that makes this > nearly impossible to implement well -- it unconditionally enables a > CachingKeyValueStore, which seems to only actually emit updates (and trigger > foreach) > in batch once per commit interval. This is terrible for the real-time > behavior I expect. > > Clearly I'm thinking about this problem wrong, but I'm not really sure what > the most effective > route is to fix my design. If the KTable was lower latency (disable > caching?) the window of > sadness gets much much smaller -- but fundamentally it seems very difficult > to track down > given a produced message M at what point you can guarantee that processor P > has observed message M > to enforce any sort of ordering guarantees. > > Anybody else wrestling with problems like this and have thoughts? Thanks in > advance. >
signature.asc
Description: OpenPGP digital signature