Thank you Guozhang. > related to your consistency requirement of the store? Do you mean flushing the cache to persist into the store or flushing the store
Yes, motivated by consistency, I want to forward the state downstream only after LRU cache is persisted into the store on disk, and the store's changelog topic has been replicated. > So if your goal is to achieve de-duping the downstream traffic I am trying to make sure, to whatever degree is supposed by the library now, that downstream processors don't see a message that is the result of state that is possibly inconsistent. Thank you for describing those mechanisms. I will investigate them. On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Dmitry, > > Forwarding to downstream upon state store flushing is an internal > implementation detail that is used by the DSL operators only, and hence we > define related classes internals to abstract them away from the users. > > For your case, one thing I need to clarify is for "flushing the store", is > how that is related to your consistency requirement of the store? Do you > mean flushing the cache to persist into the store or flushing the store > (e.g. flush a rocksDB store) itself? Here are some things for you to > notice: > > 1. A state store can be flushed by calling store.flush() programmatically, > and if the store is a cached store it will also automatically flush the > cache on top of it to make sure all dirty keys are persisted to the > underlying storage. > 2. A state store will also be flushed whenever the processor topology calls > commit(), which can either be user-triggered (context.commit() ) or based > on time period (there is a commit interval config). > > So if your goal is to achieve de-duping the downstream traffic, you can > consider using punctuator to periodically flush the store and send the > whole key-value map entries to downstream; if your goal is to only send to > downstream when the cache is flushed, you can consider overriding the > `flush()` function of the state store, that after the flush, send the whole > key-value map entries to downstream. > > > Guozhang > > > > On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dminkov...@gmail.com> > wrote: > > > Right, but I want to forward messages to downstream processor nodes only > > when the store flushes. How does that happen automatically > > when KTableSourceProcessor sets that up to happen with a TupleForwarder? > > > > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > If you build a store and enable caching, you get the KTable behavior > out > > > of the box. No need to write any custom code in the processor itself. > > > > > > > > > StoreBuilder builder = > > > Stores.keyValueStoreBuilder(...).withCachingEnabled(); > > > > > > topology.addStateStore(builder, ...) > > > > > > > > > > > > -Matthias > > > > > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote: > > > > I am writing a processor and I want its stores to behave like > KTables: > > > For > > > > consistency, I don't want to forward values until the stores have > been > > > > flushed. > > > > > > > > I am looking at `ForwardingCacheFlushListener` and see that it is > using > > > > `InternalProcessorContext` to change the current node, perform a > > forward, > > > > and then set the node back. > > > > > > > > Now, I could do the same (`InternalProcessorContext` is public), but: > > > > should I? I'm not well versed in the internals, so I am wondering > what > > > the > > > > ramifications of this might be. Is this okay to do? Should I? > > > > > > > > > > > > > > > > -- > -- Guozhang >