I was somehow not aware of this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams ... :/
On Thu, Feb 1, 2018 at 11:57 PM, Dmitry Minkovsky <dminkov...@gmail.com> wrote: > Thank you Guozhang. > > > related to your consistency requirement of the store? Do you mean > flushing the cache to persist into the store or flushing the store > > Yes, motivated by consistency, I want to forward the state downstream only > after LRU cache is persisted into the store on disk, and the store's > changelog topic has been replicated. > > > So if your goal is to achieve de-duping the downstream traffic > > I am trying to make sure, to whatever degree is supposed by the library > now, that downstream processors don't see a message that is the result of > state that is possibly inconsistent. > > Thank you for describing those mechanisms. I will investigate them. > > On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Dmitry, >> >> Forwarding to downstream upon state store flushing is an internal >> implementation detail that is used by the DSL operators only, and hence we >> define related classes internals to abstract them away from the users. >> >> For your case, one thing I need to clarify is for "flushing the store", is >> how that is related to your consistency requirement of the store? Do you >> mean flushing the cache to persist into the store or flushing the store >> (e.g. flush a rocksDB store) itself? Here are some things for you to >> notice: >> >> 1. A state store can be flushed by calling store.flush() programmatically, >> and if the store is a cached store it will also automatically flush the >> cache on top of it to make sure all dirty keys are persisted to the >> underlying storage. >> 2. A state store will also be flushed whenever the processor topology >> calls >> commit(), which can either be user-triggered (context.commit() ) or based >> on time period (there is a commit interval config). >> >> So if your goal is to achieve de-duping the downstream traffic, you can >> consider using punctuator to periodically flush the store and send the >> whole key-value map entries to downstream; if your goal is to only send to >> downstream when the cache is flushed, you can consider overriding the >> `flush()` function of the state store, that after the flush, send the >> whole >> key-value map entries to downstream. >> >> >> Guozhang >> >> >> >> On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dminkov...@gmail.com> >> wrote: >> >> > Right, but I want to forward messages to downstream processor nodes only >> > when the store flushes. How does that happen automatically >> > when KTableSourceProcessor sets that up to happen with a TupleForwarder? >> > >> > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io> >> > wrote: >> > >> > > If you build a store and enable caching, you get the KTable behavior >> out >> > > of the box. No need to write any custom code in the processor itself. >> > > >> > > >> > > StoreBuilder builder = >> > > Stores.keyValueStoreBuilder(...).withCachingEnabled(); >> > > >> > > topology.addStateStore(builder, ...) >> > > >> > > >> > > >> > > -Matthias >> > > >> > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote: >> > > > I am writing a processor and I want its stores to behave like >> KTables: >> > > For >> > > > consistency, I don't want to forward values until the stores have >> been >> > > > flushed. >> > > > >> > > > I am looking at `ForwardingCacheFlushListener` and see that it is >> using >> > > > `InternalProcessorContext` to change the current node, perform a >> > forward, >> > > > and then set the node back. >> > > > >> > > > Now, I could do the same (`InternalProcessorContext` is public), >> but: >> > > > should I? I'm not well versed in the internals, so I am wondering >> what >> > > the >> > > > ramifications of this might be. Is this okay to do? Should I? >> > > > >> > > >> > > >> > >> >> >> >> -- >> -- Guozhang >> > >