Hello Dmitry, Forwarding to downstream upon state store flushing is an internal implementation detail that is used by the DSL operators only, and hence we define related classes internals to abstract them away from the users.
For your case, one thing I need to clarify is for "flushing the store", is how that is related to your consistency requirement of the store? Do you mean flushing the cache to persist into the store or flushing the store (e.g. flush a rocksDB store) itself? Here are some things for you to notice: 1. A state store can be flushed by calling store.flush() programmatically, and if the store is a cached store it will also automatically flush the cache on top of it to make sure all dirty keys are persisted to the underlying storage. 2. A state store will also be flushed whenever the processor topology calls commit(), which can either be user-triggered (context.commit() ) or based on time period (there is a commit interval config). So if your goal is to achieve de-duping the downstream traffic, you can consider using punctuator to periodically flush the store and send the whole key-value map entries to downstream; if your goal is to only send to downstream when the cache is flushed, you can consider overriding the `flush()` function of the state store, that after the flush, send the whole key-value map entries to downstream. Guozhang On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Right, but I want to forward messages to downstream processor nodes only > when the store flushes. How does that happen automatically > when KTableSourceProcessor sets that up to happen with a TupleForwarder? > > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > > > If you build a store and enable caching, you get the KTable behavior out > > of the box. No need to write any custom code in the processor itself. > > > > > > StoreBuilder builder = > > Stores.keyValueStoreBuilder(...).withCachingEnabled(); > > > > topology.addStateStore(builder, ...) > > > > > > > > -Matthias > > > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote: > > > I am writing a processor and I want its stores to behave like KTables: > > For > > > consistency, I don't want to forward values until the stores have been > > > flushed. > > > > > > I am looking at `ForwardingCacheFlushListener` and see that it is using > > > `InternalProcessorContext` to change the current node, perform a > forward, > > > and then set the node back. > > > > > > Now, I could do the same (`InternalProcessorContext` is public), but: > > > should I? I'm not well versed in the internals, so I am wondering what > > the > > > ramifications of this might be. Is this okay to do? Should I? > > > > > > > > -- -- Guozhang