I was somehow not aware of this:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
... :/

On Thu, Feb 1, 2018 at 11:57 PM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> Thank you Guozhang.
>
> > related to your consistency requirement of the store? Do you mean
> flushing the cache to persist into the store or flushing the store
>
> Yes, motivated by consistency, I want to forward the state downstream only
> after LRU cache is persisted into the store on disk, and the store's
> changelog topic has been replicated.
>
> > So if your goal is to achieve de-duping the downstream traffic
>
> I am trying to make sure, to whatever degree is supposed by the library
> now, that downstream processors don't see a message that is the result of
> state that is possibly inconsistent.
>
> Thank you for describing those mechanisms. I will investigate them.
>
> On Thu, Feb 1, 2018 at 1:53 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Dmitry,
>>
>> Forwarding to downstream upon state store flushing is an internal
>> implementation detail that is used by the DSL operators only, and hence we
>> define related classes internals to abstract them away from the users.
>>
>> For your case, one thing I need to clarify is for "flushing the store", is
>> how that is related to your consistency requirement of the store? Do you
>> mean flushing the cache to persist into the store or flushing the store
>> (e.g. flush a rocksDB store) itself? Here are some things for you to
>> notice:
>>
>> 1. A state store can be flushed by calling store.flush() programmatically,
>> and if the store is a cached store it will also automatically flush the
>> cache on top of it to make sure all dirty keys are persisted to the
>> underlying storage.
>> 2. A state store will also be flushed whenever the processor topology
>> calls
>> commit(), which can either be user-triggered (context.commit() ) or based
>> on time period (there is a commit interval config).
>>
>> So if your goal is to achieve de-duping the downstream traffic, you can
>> consider using punctuator to periodically flush the store and send the
>> whole key-value map entries to downstream; if your goal is to only send to
>> downstream when the cache is flushed, you can consider overriding the
>> `flush()` function of the state store, that after the flush, send the
>> whole
>> key-value map entries to downstream.
>>
>>
>> Guozhang
>>
>>
>>
>> On Thu, Feb 1, 2018 at 2:10 AM, Dmitry Minkovsky <dminkov...@gmail.com>
>> wrote:
>>
>> > Right, but I want to forward messages to downstream processor nodes only
>> > when the store flushes. How does that happen automatically
>> > when KTableSourceProcessor sets that up to happen with a TupleForwarder?
>> >
>> > On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matth...@confluent.io>
>> > wrote:
>> >
>> > > If you build a store and enable caching, you get the KTable behavior
>> out
>> > > of the box. No need to write any custom code in the processor itself.
>> > >
>> > >
>> > > StoreBuilder builder =
>> > > Stores.keyValueStoreBuilder(...).withCachingEnabled();
>> > >
>> > > topology.addStateStore(builder, ...)
>> > >
>> > >
>> > >
>> > > -Matthias
>> > >
>> > > On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
>> > > > I am writing a processor and I want its stores to behave like
>> KTables:
>> > > For
>> > > > consistency, I don't want to forward values until the stores have
>> been
>> > > > flushed.
>> > > >
>> > > > I am looking at `ForwardingCacheFlushListener` and see that it is
>> using
>> > > > `InternalProcessorContext` to change the current node, perform a
>> > forward,
>> > > > and then set the node back.
>> > > >
>> > > > Now, I could do the same (`InternalProcessorContext` is public),
>> but:
>> > > > should I? I'm not well versed in the internals, so I am wondering
>> what
>> > > the
>> > > > ramifications of this might be. Is this okay to do? Should I?
>> > > >
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to