Hello Paul,

I afraid this won't work because we do not always need the defaultBranch. And without a terminal operation we don't know when to finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do something more with the original branch after branching.

I understand your point that the need of special object construction contrasts the fluency of most KStream methods. But here we have a special case: we build the switch to split the flow, so I think this is still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:
Ivan,

I think it's a great idea to improve this API, but I find the onTopOff()
mechanism a little confusing since it contrasts the fluency of other
KStream method calls.  Ideally I'd like to just call a method on the stream
so it still reads top to bottom if the branch cases are defined fluently.
I think the addBranch(predicate, handleCase) is very nice and the right way
to do things, but what if we flipped around how we specify the source
stream.

Like:

stream.branch()
         .addBranch(predicate1, this::handle1)
         .addBranch(predicate2, this::handle2)
         .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or something,
which is added to by addBranch() and terminated by defaultBranch() (which
returns void).  This is obviously incompatible with the current API, so the
new stream.branch() would have to have a different name, but that seems
like a fairly small problem - we could call it something like branched() or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it does to
me, allowing for clear in-line branching while also allowing you to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev <iponoma...@mail.ru.invalid>
wrote:

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream<String, String> ks){
         ks.filter(....).mapValues(...)
}


void handleSecondCase(KStream<String, String> ks){
         ks.selectKey(...).groupByKey()...
}

......
new KafkaStreamsBrancher<String, String>()
    .addBranch(predicate1, this::handleFirstCase)
    .addBranch(predicate2, this::handleSecondCase)
    .onTopOf(....)

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:
Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer as a
second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).

Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?

For example, using today's logic as is if we had something like this:

KStream<String, String>[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter(....).mapValues(...)..
branches[1].selectKey(...).groupByKey().....


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com> wrote:

All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)

KIP-418:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488

PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev




Reply via email to