[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159164#comment-17159164
 ] 

Ivan Ponomarev commented on KAFKA-5488:
---------------------------------------

Hi everyone, 
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR from me in one 
or maximum two weeks.

 

 

> KStream.branch should not return a Array of streams we have to access by 
> known index
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5488
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5488
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Marcel "childNo͡.de" Trautwein
>            Assignee: highluck
>            Priority: Major
>              Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
>     public static void main(String... args) {
>         KStream<byte[], EventType>[] branchedStreams= new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             (k, v) -> EventType::validData
>             (k, v) -> true
>         );
>         
>         branchedStreams[0]
>         .to("topicValidData");
>         
>         branchedStreams[1]
>         .to("topicInvalidData");
>     }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition<Predicate<>, 
> Consumer<KStream<>>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
>         new KStreamBuilder()
>         .<byte[], EventType>stream("eventTopic")
>         .branch(
>             Branch.create(
>                 (k, v) -> EventType::validData,
>                 stream -> stream.to("topicValidData")
>             ),
>             Branch.create(
>                 (k, v) -> true,
>                 stream -> stream.to("topicInvalidData")
>             )
>         );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to