[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ivan Ponomarev closed KAFKA-5488. --------------------------------- PR is merged to trunk to be included in 2.8.0 release > KStream.branch should not return a Array of streams we have to access by > known index > ------------------------------------------------------------------------------------ > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Marcel "childNo͡.de" Trautwein > Assignee: Ivan Ponomarev > Priority: Major > Labels: kip > Fix For: 2.8.0 > > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, > Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .<byte[], EventType>stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)