Thank you Guozhang.

> related to your consistency requirement of the store? Do you mean
flushing the cache to persist into the store or flushing the store

Yes, motivated by consistency, I want to forward the state downstream only
after LRU cache is persisted into the store on disk, and the store's
changelog topic has been replicated.

> So if your goal is to achieve de-duping the downstream traffic

I am trying to make sure, to whatever degree is supposed by the library
now, that downstream processors don't see a message that is the result of
state that is possibly inconsistent.

Thank you for describing those mechanisms. I will investigate them.

On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Dmitry,
>
> Forwarding to downstream upon state store flushing is an internal
> implementation detail that is used by the DSL operators only, and hence we
> define related classes internals to abstract them away from the users.
>
> For your case, one thing I need to clarify is for "flushing the store", is
> how that is related to your consistency requirement of the store? Do you
> mean flushing the cache to persist into the store or flushing the store
> (e.g. flush a rocksDB store) itself? Here are some things for you to
> notice:
>
> 1. A state store can be flushed by calling store.flush() programmatically,
> and if the store is a cached store it will also automatically flush the
> cache on top of it to make sure all dirty keys are persisted to the
> underlying storage.
> 2. A state store will also be flushed whenever the processor topology calls
> commit(), which can either be user-triggered (context.commit() ) or based
> on time period (there is a commit interval config).
>
> So if your goal is to achieve de-duping the downstream traffic, you can
> consider using punctuator to periodically flush the store and send the
> whole key-value map entries to downstream; if your goal is to only send to
> downstream when the cache is flushed, you can consider overriding the
> `flush()` function of the state store, that after the flush, send the whole
> key-value map entries to downstream.
>
>
> Guozhang
>
>
>
> On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dminkov...@gmail.com>
> wrote:
>
> > Right, but I want to forward messages to downstream processor nodes only
> > when the store flushes. How does that happen automatically
> > when KTableSourceProcessor sets that up to happen with a TupleForwarder?
> >
> > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > If you build a store and enable caching, you get the KTable behavior
> out
> > > of the box. No need to write any custom code in the processor itself.
> > >
> > >
> > > StoreBuilder builder =
> > > Stores.keyValueStoreBuilder(...).withCachingEnabled();
> > >
> > > topology.addStateStore(builder, ...)
> > >
> > >
> > >
> > > -Matthias
> > >
> > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > > > I am writing a processor and I want its stores to behave like
> KTables:
> > > For
> > > > consistency, I don't want to forward values until the stores have
> been
> > > > flushed.
> > > >
> > > > I am looking at `ForwardingCacheFlushListener` and see that it is
> using
> > > > `InternalProcessorContext` to change the current node, perform a
> > forward,
> > > > and then set the node back.
> > > >
> > > > Now, I could do the same (`InternalProcessorContext` is public), but:
> > > > should I? I'm not well versed in the internals, so I am wondering
> what
> > > the
> > > > ramifications of this might be. Is this okay to do? Should I?
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to