Hello Dmitry,

Forwarding to downstream upon state store flushing is an internal
implementation detail that is used by the DSL operators only, and hence we
define related classes internals to abstract them away from the users.

For your case, one thing I need to clarify is for "flushing the store", is
how that is related to your consistency requirement of the store? Do you
mean flushing the cache to persist into the store or flushing the store
(e.g. flush a rocksDB store) itself? Here are some things for you to notice:

1. A state store can be flushed by calling store.flush() programmatically,
and if the store is a cached store it will also automatically flush the
cache on top of it to make sure all dirty keys are persisted to the
underlying storage.
2. A state store will also be flushed whenever the processor topology calls
commit(), which can either be user-triggered (context.commit() ) or based
on time period (there is a commit interval config).

So if your goal is to achieve de-duping the downstream traffic, you can
consider using punctuator to periodically flush the store and send the
whole key-value map entries to downstream; if your goal is to only send to
downstream when the cache is flushed, you can consider overriding the
`flush()` function of the state store, that after the flush, send the whole
key-value map entries to downstream.


Guozhang



On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> Right, but I want to forward messages to downstream processor nodes only
> when the store flushes. How does that happen automatically
> when KTableSourceProcessor sets that up to happen with a TupleForwarder?
>
> On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > If you build a store and enable caching, you get the KTable behavior out
> > of the box. No need to write any custom code in the processor itself.
> >
> >
> > StoreBuilder builder =
> > Stores.keyValueStoreBuilder(...).withCachingEnabled();
> >
> > topology.addStateStore(builder, ...)
> >
> >
> >
> > -Matthias
> >
> > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > > I am writing a processor and I want its stores to behave like KTables:
> > For
> > > consistency, I don't want to forward values until the stores have been
> > > flushed.
> > >
> > > I am looking at `ForwardingCacheFlushListener` and see that it is using
> > > `InternalProcessorContext` to change the current node, perform a
> forward,
> > > and then set the node back.
> > >
> > > Now, I could do the same (`InternalProcessorContext` is public), but:
> > > should I? I'm not well versed in the internals, so I am wondering what
> > the
> > > ramifications of this might be. Is this okay to do? Should I?
> > >
> >
> >
>



-- 
-- Guozhang

Reply via email to