Paul Whalen created KAFKA-7523:
----------------------------------

             Summary: TransformerSupplier/ProcessorSupplier enhancements
                 Key: KAFKA-7523
                 URL: https://issues.apache.org/jira/browse/KAFKA-7523
             Project: Kafka
          Issue Type: Improvement
          Components: streams
            Reporter: Paul Whalen


I have found that when writing "low level" {{Processors}} and {{Transformers}} 
that are stateful, often I want these processors to "own" one or more state 
stores, the details of which are not important to the business logic of the 
application.  However, when incorporating these into the topologies defined by 
the high level API, using {{KStream::transform}} or {{KStream::process}}, I'm 
forced to specify the stores so the topology is wired up correctly.  This 
creates an unfortunate pattern where the {{TransformerSupplier}} or 
{{ProcessorSupplier,}} who (according to the pattern I've been following) holds 
the information about the name of the state stores, must be defined above the 
"high level" "fluent API"-style pipeline, which makes it hard to understand the 
business logic data flow.

 

What I currently have to do:
{code:java}
TransformerSupplier transformerSupplier = new 
TransformerSupplierWithState(topology, val -> businessLogic(val));
builder.stream("in.topic")
        .transform(transformerSupplier, transformerSupplier.stateStoreNames())
        .to("out.topic");{code}
I have to both define the {{TransformerSupplier}} above the "fluent block", and 
pass the topology in so I can call {{topology.addStateStore()}} inside the 
{{TransformerSupplier}} constructor and tell the {{StreamsBuilder}} what the 
state store names are for that point in the topology. The lambda {{val -> 
businessLogic(val)}} is really what I want to see in-line because that's the 
crux of what is happening, along with the name of some factory method 
describing what the transformer is doing for me internally. This issue is 
obviously exacerbated when the "fluent block" is much longer than this example 
- It gets worse the farther away {{val -> businessLogic(val)}} is from 
{{KStream::transform}}.

 
An improvement:
{code:java}
builder.stream("in.topic")
        .transform(transformerSupplierWithState(topology, val -> 
businessLogic(val)))
        .to("out.topic");{code}
Which implies the existence of a {{KStream::transform}} that takes a single 
argument that adheres to this interface:
{code:java}
interface TransformerSupplierWithState<K, V, R> {
    Transformer<K, V, R> get();
    String[] stateStoreNames();
}{code}
Or better yet, I wouldn't have to pass in the topology, the caller of 
{{TransformerSupplierWithState}} could also handle the job of "adding" its 
state stores to the topology:
{code:java}
interface TransformerSupplierWithState<K, V, R> {
    Transformer<K, V, R> get();
    Map<String, StoreBuilder> stateStores();
}{code}
Which would enable my ideal:
{code:java}
builder.stream("in.topic")
        .transform(transformerSupplierWithState(val -> businessLogic(val)))
        .to("out.topic");{code}
I think this would be a huge improvement in the usability of low-level 
processors with the high-level DSL.

Please let me know if I'm missing something as to why this cannot or should not 
happen, or if there is a better forum for this suggestion (presumably it would 
require a KIP?). I'd be happy to build it as well if there is a chance of it 
being merged, it doesn't seem like a huge challenge to me.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to