Right, but I want to forward messages to downstream processor nodes only when the store flushes. How does that happen automatically when KTableSourceProcessor sets that up to happen with a TupleForwarder?
On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io> wrote: > If you build a store and enable caching, you get the KTable behavior out > of the box. No need to write any custom code in the processor itself. > > > StoreBuilder builder = > Stores.keyValueStoreBuilder(...).withCachingEnabled(); > > topology.addStateStore(builder, ...) > > > > -Matthias > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote: > > I am writing a processor and I want its stores to behave like KTables: > For > > consistency, I don't want to forward values until the stores have been > > flushed. > > > > I am looking at `ForwardingCacheFlushListener` and see that it is using > > `InternalProcessorContext` to change the current node, perform a forward, > > and then set the node back. > > > > Now, I could do the same (`InternalProcessorContext` is public), but: > > should I? I'm not well versed in the internals, so I am wondering what > the > > ramifications of this might be. Is this okay to do? Should I? > > > >