I think a main limitation would be, that you cannot mix the 4 patterns
within a single application anymore (iff you use a "caches state"). If
you have processor with a "cached state" this disables direct usage of
context.forward() completely -- if I understand the design correctly.
Thus, if a "cached state" is used, forwarding is only possible via state

The above described approach is fine from DSL point of view. The main
question is, if a "cached state" should be a DSL internal implementation
detail or should be exposed to the user for Processor API reuse. For the
former, the design is fine; for the latter, IMHO it puts a limitation
and hard to understand usage pattern for a regular user of Processor API.


On 09/04/2016 05:28 PM, Matthias J. Sax wrote:
> We had a recent discussion about KIP-63, and I just c&p from the JIRA
> discussion:
> Damian:
>> During the code walk-through, Matthias raised a very good point about the 
>> use of context().forward being coupled to whether or not caching is enabled. 
>> Now that i've had the chance to think about it I have one potential solution 
>> for making this transparent to uses of the Processor API.
>> We can add another method boolean isCachingEnabled() to the new interface 
>> ForwardingStateStoreSupplier. We also add 2 new methods to ProcessorNode:
>> boolean isStateStoreCachingEnabled() and void setStateStoreCachingEnabled()
>> In TopologyBuilder when we are creating the ProcessorNodeCacheFlushListener 
>> to attach to the ForwardingStateStoreSupplier we can call 
>> ProcessorNode.setStateStoreCachingEnabled(supplier.isStateStoreCachingEnabled())
>> We add an extra boolean parameter to the ProcessorRecordContextImpl forward 
>> this will be set to false when constructed from StreamTask and will be set 
>> to true when constructed from ProcessorNodeCacheFlushListener. Then in 
>> ProcessorRecordContextImpl.forward(..) we add a guard if (shouldForward()) 
>> where shouldForward is return forward || !node.stateStoreCachingEnabled();
>> Now Processors are free to call context().forward(..) whether caching is 
>> enabled or not. If it is enabled the values just wont get forwarded until 
>> the cache evicts/flushes them.
> Matthias:
>> I guess this is a good solution/workaround. I had something like this in my 
>> mind during the call, too.
>> However, thinking about the root cause of this issue again, I am not sure if 
>> the (overall) design of this KIP is optimal or not. My new concern is, that 
>> with this caching strategy, we "merge" two concepts into one; and I am not 
>> sure, if we should to this.
>> Currently, message flow and state is decoupled and independent of each 
>> other. Thus, if there is a state, updates to the state are completely 
>> independent from emitting output records. With the new design, we merge 
>> state updates and record emits, limiting the overall flexibility. I guess, 
>> from a DSL point of view, this would not be problematic, because in an 
>> aggregation and changelog output, each update to the state should result in 
>> a downstream record. However, from a Processor API point of view, there are 
>> other patterns we want to be able to support, too.
>> Basically, for each input record, there a four different patterns that could 
>> be applied by the user:
>>     no state updates, no output records
>>     only state update
>>     only output records
>>     state updates and output records
>> Right now, we go with a design that allows to use one of the patterns within 
>> a Processor. However, all 4 pattern could be mixed within a single Processor 
>> (pre KIP design), and this mixture would not be possible any more. If we 
>> want to support all four cases, we might not want to merge both into "a 
>> single abstraction" as we do in the design of this PR. What if a user just 
>> wants to sent a record downstream (without any state manipulation)?
>> Going back to the KIP design, we move the cache from RocksDB into the 
>> processor. However, what we actually wanted to do was to de-duplicate output 
>> records. Thus, the newly introduced cache, could actually go "after the 
>> processor" and could be completely independent from the state. Thus, on each 
>> call to forward() the record is put into the cache, and if the cache is 
>> full, an actual cache eviction and record forwarding happens. This would 
>> make the de-duplication cache independent from the state.
> Eno:
>> it's not entirely true that the flexibility is limited. For example, what's 
>> next in implementation is https://issues.apache.org/jira/browse/KAFKA-3779 
>> where we add the dedup cache to the to operator. That is not implemented yet.
> Damian:
>> i think of the 4 patterns you mentioned only the last one changes, i.e, 
>> state updates and output records.
>> context.forward() still exists so you can just send a record downstream 
>> without any state manipulation, that behaviour hasn't changed.
> On 08/24/2016 03:35 PM, Eno Thereska wrote:
>> Hi folks,
>> We've been working on a proof-of-concept for KIP-63 and that can now be
>> found at the main JIRA (https://issues.apache.org/jira/browse/KAFKA-3776)
>> under PR https://github.com/apache/kafka/pull/1752. It is still work in
>> progress, however we are confident that the basic structure is there.
>> As part of this work, we've also updated the KIP to clarify several things,
>> listed here for convenience:
>> - Clarify that the optimization is applicable to aggregations and to
>> operators. It is not applicable to joins.
>> - Clarify that for the low-level Processor API, we propose to allow users
>> for disabling caching on a store-by-store basis using a new
>> .enableCaching() call.
>> We'll start the voting process shortly for this KIP.
>> Thanks
>> Eno
>> On Thu, Jun 2, 2016 at 11:30 AM, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>> Hi there,
>>> I have created KIP-63: Unify store and downstream caching in streams
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 63%3A+Unify+store+and+downstream+caching+in+streams
>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams>
>>> Feedback is appreciated.
>>> Thank you
>>> Eno

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to