[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158505#comment-17158505 ]
highluck edited comment on KAFKA-5488 at 7/15/20, 4:20 PM: ----------------------------------------------------------- I think there are some unnecessary interfaces {code:java} Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain) {code} I don't think there is a need to take a function as a parameter, That's enough for the consumer {code:java} Map<String, KStream<K, V>> defaultBranch();{code} I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? [~mjsax] {{What do you think?!}} was (Author: high.lee): I think there are some unnecessary interfaces {code:java} Branched<K, V> with(Function<? super KStream<? super K, ? super V>, ? extends KStream<? extends K, ? extends V>> chain) {code} I don't think there is a need to take a function as a parameter, That's enough for the consumer {code:java} Map<String, KStream<K, V>> defaultBranch();{code} I don't think the return type needs to be a Map, Wouldn't it be better to unify the return type to BranchedKStream? [~mjsax] {{What do you think?!}} > KStream.branch should not return a Array of streams we have to access by > known index > ------------------------------------------------------------------------------------ > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Marcel "childNo͡.de" Trautwein > Assignee: highluck > Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, > Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)