Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-05 Thread Dmitry Minkovsky
I was somehow not aware of this: https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams ... :/ On Thu, Feb 1, 2018 at 11:57 PM, Dmitry Minkovsky wrote: > Thank you Guozhang. > > > related to your consistency requirement of

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-01 Thread Dmitry Minkovsky
Thank you Guozhang. > related to your consistency requirement of the store? Do you mean flushing the cache to persist into the store or flushing the store Yes, motivated by consistency, I want to forward the state downstream only after LRU cache is persisted into the store on disk, and the

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-01 Thread Guozhang Wang
Hello Dmitry, Forwarding to downstream upon state store flushing is an internal implementation detail that is used by the DSL operators only, and hence we define related classes internals to abstract them away from the users. For your case, one thing I need to clarify is for "flushing the

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-02-01 Thread Dmitry Minkovsky
Right, but I want to forward messages to downstream processor nodes only when the store flushes. How does that happen automatically when KTableSourceProcessor sets that up to happen with a TupleForwarder? On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax wrote: > If you

Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-01-31 Thread Matthias J. Sax
If you build a store and enable caching, you get the KTable behavior out of the box. No need to write any custom code in the processor itself. StoreBuilder builder = Stores.keyValueStoreBuilder(...).withCachingEnabled(); topology.addStateStore(builder, ...) -Matthias On 1/31/18 6:19 PM,

Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?

2018-01-31 Thread Dmitry Minkovsky
I am writing a processor and I want its stores to behave like KTables: For consistency, I don't want to forward values until the stores have been flushed. I am looking at `ForwardingCacheFlushListener` and see that it is using `InternalProcessorContext` to change the current node, perform a