[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158505#comment-17158505
 ] 

highluck edited comment on KAFKA-5488 at 7/15/20, 4:20 PM:
-----------------------------------------------------------

I think there are some unnecessary interfaces
{code:java}
Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends 
KStream<? extends K, ? extends V>> chain)
{code}
I don't think there is a need to take a function as a parameter, That's enough 
for the consumer

 
{code:java}
 Map<String, KStream<K, V>> defaultBranch();{code}
I don't think the return type needs to be a Map, Wouldn't it be better to unify 
the return type to BranchedKStream?

 

[~mjsax]

{{What do you think?!}}

 


was (Author: high.lee):
I think there are some unnecessary interfaces

 
{code:java}
Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends 
KStream<? extends K, ? extends V>> chain)
{code}
I don't think there is a need to take a function as a parameter, That's enough 
for the consumer

 

 

 
{code:java}
 Map<String, KStream<K, V>> defaultBranch();{code}
 

I don't think the return type needs to be a Map, Wouldn't it be better to unify 
the return type to BranchedKStream?

 

[~mjsax]

{{What do you think?!}}

 

> KStream.branch should not return a Array of streams we have to access by 
> known index
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5488
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5488
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Marcel "childNo͡.de" Trautwein
>            Assignee: highluck
>            Priority: Major
>              Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
>     public static void main(String... args) {
>         KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             (k, v) -> EventType::validData
>             (k, v) -> true
>         );
>         
>         branchedStreams[0]
>         .to("topicValidData");
>         
>         branchedStreams[1]
>         .to("topicInvalidData");
>     }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, 
> Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
>         new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             Branch.create(
>                 (k, v) -> EventType::validData,
>                 stream -> stream.to("topicValidData")
>             ),
>             Branch.create(
>                 (k, v) -> true,
>                 stream -> stream.to("topicInvalidData")
>             )
>         );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to