Hi Ivan,

Thanks for the update.

FWIW, I agree with Matthias that the current "start branching" operator is
confusing when named the same way as the actual branches. "Split" seems
like a good name. Alternatively, we can do without a "start branching"
operator at all, and just do:

stream
      .branch(Predicate)
      .branch(Predicate)
      .defaultBranch();

Tentatively, I think that this branching operation should be terminal. That
way, we don't create ambiguity about how to use it. That is, `branch`
should return `KBranchedStream`, while `defaultBranch` is `void`, to
enforce that it comes last, and that there is only one definition of the
default branch. Potentially, we should log a warning if there's no default,
and additionally log a warning (or throw an exception) if a record falls
though with no default.

Thoughts?

Thanks,
-John

On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for updating the KIP and your answers.
>
>
> >  this is to make the name similar to String#split
> >> that also returns an array, right?
>
> The intend was to avoid name duplication. The return type should _not_
> be an array.
>
> The current proposal is
>
> stream.branch()
>       .branch(Predicate)
>       .branch(Predicate)
>       .defaultBranch();
>
> IMHO, this reads a little odd, because the first `branch()` does not
> take any parameters and has different semantics than the later
> `branch()` calls. Note, that from the code snippet above, it's hidden
> that the first call is `KStream#branch()` while the others are
> `KBranchedStream#branch()` what makes reading the code harder.
>
> Because I suggested to rename `addBranch()` -> `branch()`, I though it
> might be better to also rename `KStream#branch()` to avoid the naming
> overlap that seems to be confusing. The following reads much cleaner to me:
>
> stream.split()
>       .branch(Predicate)
>       .branch(Predicate)
>       .defaultBranch();
>
> Maybe there is a better alternative to `split()` though to avoid the
> naming overlap.
>
>
> > 'default' is, however, a reserved word, so unfortunately we cannot have
> a method with such name :-)
>
> Bummer. Didn't consider this. Maybe we can still come up with a short name?
>
>
> Can you add the interface `KBranchedStream` to the KIP with all it's
> methods? It will be part of public API and should be contained in the
> KIP. For example, it's unclear atm, what the return type of
> `defaultBranch()` is.
>
>
> You did not comment on the idea to add a `KBranchedStream#get(int index)
> -> KStream` method to get the individually branched-KStreams. Would be
> nice to get your feedback about it. It seems you suggest that users
> would need to write custom utility code otherwise, to access them. We
> should discuss the pros and cons of both approaches. It feels
> "incomplete" to me atm, if the API has no built-in support to get the
> branched-KStreams directly.
>
>
>
> -Matthias
>
>
> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > Hi all!
> >
> > I have updated the KIP-418 according to the new vision.
> >
> > Matthias, thanks for your comment!
> >
> >> Renaming KStream#branch() -> #split()
> >
> > I can see your point: this is to make the name similar to String#split
> > that also returns an array, right? But is it worth the loss of backwards
> > compatibility? We can have overloaded branch() as well without affecting
> > the existing code. Maybe the old array-based `branch` method should be
> > deprecated, but this is a subject for discussion.
> >
> >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >
> > Totally agree with 'addBranch->branch' rename. 'default' is, however, a
> > reserved word, so unfortunately we cannot have a method with such name
> :-)
> >
> >> defaultBranch() does take an `Predicate` as argument, but I think that
> > is not required?
> >
> > Absolutely! I think that was just copy-paste error or something.
> >
> > Dear colleagues,
> >
> > please revise the new version of the KIP and Paul's PR
> > (https://github.com/apache/kafka/pull/6512)
> >
> > Any new suggestions/objections?
> >
> > Regards,
> >
> > Ivan
> >
> >
> > 11.04.2019 11:47, Matthias J. Sax пишет:
> >> Thanks for driving the discussion of this KIP. It seems that everybody
> >> agrees that the current branch() method using arrays is not optimal.
> >>
> >> I had a quick look into the PR and I like the overall proposal. There
> >> are some minor things we need to consider. I would recommend the
> >> following renaming:
> >>
> >> KStream#branch() -> #split()
> >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> >>
> >> It's just a suggestion to get slightly shorter method names.
> >>
> >> In the current PR, defaultBranch() does take an `Predicate` as argument,
> >> but I think that is not required?
> >>
> >> Also, we should consider KIP-307, that was recently accepted and is
> >> currently implemented:
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> >>
> >> Ie, we should add overloads that accepted a `Named` parameter.
> >>
> >>
> >> For the issue that the created `KStream` object are in different scopes:
> >> could we extend `KBranchedStream` with a `get(int index)` method that
> >> returns the corresponding "branched" result `KStream` object? Maybe, the
> >> second argument of `addBranch()` should not be a `Consumer<KStream>` but
> >> a `Function<KStream,KStream>` and `get()` could return whatever the
> >> `Function` returns?
> >>
> >>
> >> Finally, I would also suggest to update the KIP with the current
> >> proposal. That makes it easier to review.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 3/31/19 12:22 PM, Paul Whalen wrote:
> >>> Ivan,
> >>>
> >>> I'm a bit of a novice here as well, but I think it makes sense for you
> to
> >>> revise the KIP and continue the discussion.  Obviously we'll need some
> >>> buy-in from committers that have actual binding votes on whether the
> KIP
> >>> could be adopted.  It would be great to hear if they think this is a
> good
> >>> idea overall.  I'm not sure if that happens just by starting a vote,
> or if
> >>> there is generally some indication of interest beforehand.
> >>>
> >>> That being said, I'll continue the discussion a bit: assuming we do
> move
> >>> forward the solution of "stream.branch() returns KBranchedStream", do
> we
> >>> deprecate "stream.branch(...) returns KStream[]"?  I would favor
> >>> deprecating, since having two mutually exclusive APIs that accomplish
> the
> >>> same thing is confusing, especially when they're fairly similar
> anyway.  We
> >>> just need to be sure we're not making something impossible/difficult
> that
> >>> is currently possible/easy.
> >>>
> >>> Regarding my PR - I think the general structure would work, it's just a
> >>> little sloppy overall in terms of naming and clarity. In particular,
> >>> passing in the "predicates" and "children" lists which get modified in
> >>> KBranchedStream but read from all the way KStreamLazyBranch is a bit
> >>> complicated to follow.
> >>>
> >>> Thanks,
> >>> Paul
> >>>
> >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru>
> wrote:
> >>>
> >>>> Hi Paul!
> >>>>
> >>>> I read your code carefully and now I am fully convinced: your proposal
> >>>> looks better and should work. We just have to document the crucial
> fact
> >>>> that KStream consumers are invoked as they're added. And then it's all
> >>>> going to be very nice.
> >>>>
> >>>> What shall we do now? I should re-write the KIP and resume the
> >>>> discussion here, right?
> >>>>
> >>>> Why are you telling that your PR 'should not be even a starting point
> if
> >>>> we go in this direction'? To me it looks like a good starting point.
> But
> >>>> as a novice in this project I might miss some important details.
> >>>>
> >>>> Regards,
> >>>>
> >>>> Ivan
> >>>>
> >>>>
> >>>> 28.03.2019 17:38, Paul Whalen пишет:
> >>>>> Ivan,
> >>>>>
> >>>>> Maybe I’m missing the point, but I believe the stream.branch()
> solution
> >>>> supports this. The couponIssuer::set* consumers will be invoked as
> they’re
> >>>> added, not during streamsBuilder.build(). So the user still ought to
> be
> >>>> able to call couponIssuer.coupons() afterward and depend on the
> branched
> >>>> streams having been set.
> >>>>> The issue I mean to point out is that it is hard to access the
> branched
> >>>> streams in the same scope as the original stream (that is, not inside
> the
> >>>> couponIssuer), which is a problem with both proposed solutions. It
> can be
> >>>> worked around though.
> >>>>> [Also, great to hear additional interest in 401, I’m excited to hear
> >>>> your thoughts!]
> >>>>> Paul
> >>>>>
> >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru>
> wrote:
> >>>>>>
> >>>>>> Hi Paul!
> >>>>>>
> >>>>>> The idea to postpone the wiring of branches to the
> >>>> streamsBuilder.build() also looked great for me at first glance, but
> ---
> >>>>>>> the newly branched streams are not available in the same scope as
> each
> >>>> other.  That is, if we wanted to merge them back together again I
> don't see
> >>>> a way to do that.
> >>>>>> You just took the words right out of my mouth, I was just going to
> >>>> write in details about this issue.
> >>>>>> Consider the example from Bill's book, p. 101: say we need to
> identify
> >>>> customers who have bought coffee and made a purchase in the
> electronics
> >>>> store to give them coupons.
> >>>>>> This is the code I usually write under these circumstances using my
> >>>> 'brancher' class:
> >>>>>> @Setter
> >>>>>> class CouponIssuer{
> >>>>>>    private KStream<....> coffePurchases;
> >>>>>>    private KStream<....> electronicsPurchases;
> >>>>>>
> >>>>>>    KStream<...> coupons(){
> >>>>>>        return
> coffePurchases.join(electronicsPurchases...)...whatever
> >>>>>>
> >>>>>>        /*In the real world the code here can be complex, so
> creation of
> >>>> a separate CouponIssuer class is fully justified, in order to separate
> >>>> classes' responsibilities.*/
> >>>>>>   }
> >>>>>> }
> >>>>>>
> >>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> >>>>>>
> >>>>>> new KafkaStreamsBrancher<....>()
> >>>>>>      .branch(predicate1, couponIssuer::setCoffePurchases)
> >>>>>>      .branch(predicate2, couponIssuer::setElectronicsPurchases)
> >>>>>>      .onTopOf(transactionStream);
> >>>>>>
> >>>>>> /*Alas, this won't work if we're going to wire up everything later,
> >>>> without the terminal operation!!!*/
> >>>>>> couponIssuer.coupons()...
> >>>>>>
> >>>>>> Does this make sense?  In order to properly initialize the
> CouponIssuer
> >>>> we need the terminal operation to be called before
> streamsBuilder.build()
> >>>> is called.
> >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially the
> next
> >>>> KIP I was going to write here. I have some thoughts based on my
> experience,
> >>>> so I will join the discussion on KIP-401 soon.]
> >>>>>> Regards,
> >>>>>>
> >>>>>> Ivan
> >>>>>>
> >>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> >>>>>>> Ivan,
> >>>>>>> I tried to make a very rough proof of concept of a fluent API based
> >>>> off of
> >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), and I
> think
> >>>> I
> >>>>>>> succeeded at removing both cons.
> >>>>>>>     - Compatibility: I was incorrect earlier about compatibility
> >>>> issues,
> >>>>>>>     there aren't any direct ones.  I was unaware that Java is smart
> >>>> enough to
> >>>>>>>     distinguish between a branch(varargs...) returning one thing
> and
> >>>> branch()
> >>>>>>>     with no arguments returning another thing.
> >>>>>>>     - Requiring a terminal method: We don't actually need it.  We
> can
> >>>> just
> >>>>>>>     build up the branches in the KBranchedStream who shares its
> state
> >>>> with the
> >>>>>>>     ProcessorSupplier that will actually do the branching.  It's
> not
> >>>> terribly
> >>>>>>>     pretty in its current form, but I think it demonstrates its
> >>>> feasibility.
> >>>>>>> To be clear, I don't think that pull request should be final or
> even a
> >>>>>>> starting point if we go in this direction, I just wanted to see how
> >>>>>>> challenging it would be to get the API working.
> >>>>>>> I will say though, that I'm not sure the existing solution could be
> >>>>>>> deprecated in favor of this, which I had originally suggested was a
> >>>>>>> possibility.  The reason is that the newly branched streams are not
> >>>>>>> available in the same scope as each other.  That is, if we wanted
> to
> >>>> merge
> >>>>>>> them back together again I don't see a way to do that.  The KIP
> >>>> proposal
> >>>>>>> has the same issue, though - all this means is that for either
> >>>> solution,
> >>>>>>> deprecating the existing branch(...) is not on the table.
> >>>>>>> Thanks,
> >>>>>>> Paul
> >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> iponoma...@mail.ru>
> >>>> wrote:
> >>>>>>>> OK, let me summarize what we have discussed up to this point.
> >>>>>>>>
> >>>>>>>> First, it seems that it's commonly agreed that branch API needs
> >>>>>>>> improvement. Motivation is given in the KIP.
> >>>>>>>>
> >>>>>>>> There are two potential ways to do it:
> >>>>>>>>
> >>>>>>>> 1. (as origianlly proposed)
> >>>>>>>>
> >>>>>>>> new KafkaStreamsBrancher<..>()
> >>>>>>>>     .branch(predicate1, ks ->..)
> >>>>>>>>     .branch(predicate2, ks->..)
> >>>>>>>>     .defaultBranch(ks->..) //optional
> >>>>>>>>     .onTopOf(stream).mapValues(...).... //onTopOf returns its
> argument
> >>>>>>>>
> >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make sense
> >>>> until
> >>>>>>>> all the necessary ingredients are provided.
> >>>>>>>>
> >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
> contrasts the
> >>>>>>>> fluency of other KStream methods.
> >>>>>>>>
> >>>>>>>> 2. (as Paul proposes)
> >>>>>>>>
> >>>>>>>> stream
> >>>>>>>>     .branch(predicate1, ks ->...)
> >>>>>>>>     .branch(predicate2, ks->...)
> >>>>>>>>     .defaultBranch(ks->...) //or noDefault(). Both
> defaultBranch(..)
> >>>> and
> >>>>>>>> noDefault() return void
> >>>>>>>>
> >>>>>>>> PROS: Generally follows the way KStreams interface is defined.
> >>>>>>>>
> >>>>>>>> CONS: We need to define two terminal methods (defaultBranch(ks->)
> and
> >>>>>>>> noDefault()). And for a user it is very easy to miss the fact
> that one
> >>>>>>>> of the terminal methods should be called. If these methods are not
> >>>>>>>> called, we can throw an exception in runtime.
> >>>>>>>>
> >>>>>>>> Colleagues, what are your thoughts? Can we do better?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>>
> >>>>>>>> Ivan
> >>>>>>>>
> >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> >>>>>>>>>> Paul,
> >>>>>>>>>>
> >>>>>>>>>> I see your point when you are talking about
> >>>>>>>>>> stream..branch..branch...default..
> >>>>>>>>>>
> >>>>>>>>>> Still, I believe that this cannot not be implemented the easy
> way.
> >>>>>>>>>> Maybe we all should think further.
> >>>>>>>>>>
> >>>>>>>>>> Let me comment on two of your ideas.
> >>>>>>>>>>
> >>>>>>>>>>> user could specify a terminal method that assumes nothing will
> >>>> reach
> >>>>>>>>>>> the default branch,
> >>>>>>>>>> throwing an exception if such a case occurs.
> >>>>>>>>>>
> >>>>>>>>>> 1) OK, apparently this should not be the only option besides
> >>>>>>>>>> `default`, because there are scenarios when we want to just
> silently
> >>>>>>>>>> drop the messages that didn't match any predicate. 2) Throwing
> an
> >>>>>>>>>> exception in the middle of data flow processing looks like a bad
> >>>> idea.
> >>>>>>>>>> In stream processing paradigm, I would prefer to emit a special
> >>>>>>>>>> message to a dedicated stream. This is exactly where `default`
> can
> >>>> be
> >>>>>>>>>> used.
> >>>>>>>>>>
> >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to
> track
> >>>>>>>>>>> dangling
> >>>>>>>>>> branches that haven't been terminated and raise a clear error
> >>>> before it
> >>>>>>>>>> becomes an issue.
> >>>>>>>>>>
> >>>>>>>>>> You mean a runtime exception, when the program is compiled and
> run?
> >>>>>>>>>> Well,  I'd prefer an API that simply won't compile if used
> >>>>>>>>>> incorrectly. Can we build such an API as a method chain starting
> >>>> from
> >>>>>>>>>> KStream object? There is a huge cost difference between runtime
> and
> >>>>>>>>>> compile-time errors. Even if a failure uncovers instantly on
> unit
> >>>>>>>>>> tests, it costs more for the project than a compilation failure.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>>
> >>>>>>>>>> Ivan
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> >>>>>>>>>>> Ivan,
> >>>>>>>>>>>
> >>>>>>>>>>> Good point about the terminal operation being required.  But is
> >>>> that
> >>>>>>>>>>> really
> >>>>>>>>>>> such a bad thing?  If the user doesn't want a defaultBranch
> they
> >>>> can
> >>>>>>>>>>> call
> >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as
> easily.  In
> >>>>>>>>>>> fact I
> >>>>>>>>>>> think it creates an opportunity for a nicer API - a user could
> >>>> specify
> >>>>>>>> a
> >>>>>>>>>>> terminal method that assumes nothing will reach the default
> branch,
> >>>>>>>>>>> throwing an exception if such a case occurs.  That seems like
> an
> >>>>>>>>>>> improvement over the current branch() API, which allows for the
> >>>> more
> >>>>>>>>>>> subtle
> >>>>>>>>>>> behavior of records unexpectedly getting dropped.
> >>>>>>>>>>>
> >>>>>>>>>>> The need for a terminal operation certainly has to be well
> >>>>>>>>>>> documented, but
> >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to
> track
> >>>>>>>>>>> dangling
> >>>>>>>>>>> branches that haven't been terminated and raise a clear error
> >>>> before it
> >>>>>>>>>>> becomes an issue.  Especially now that there is a "build step"
> >>>> where
> >>>>>>>> the
> >>>>>>>>>>> topology is actually wired up, when StreamsBuilder.build() is
> >>>> called.
> >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree that it's
> >>>>>>>>>>> critical to
> >>>>>>>>>>> allow users to do other operations on the input stream.  With
> the
> >>>>>>>> fluent
> >>>>>>>>>>> solution, it ought to work the same way all other operations
> do -
> >>>> if
> >>>>>>>> you
> >>>>>>>>>>> want to process off the original KStream multiple times, you
> just
> >>>>>>>>>>> need the
> >>>>>>>>>>> stream as a variable so you can call as many operations on it
> as
> >>>> you
> >>>>>>>>>>> desire.
> >>>>>>>>>>>
> >>>>>>>>>>> Thoughts?
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Paul
> >>>>>>>>>>>
> >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> iponoma...@mail.ru
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello Paul,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I afraid this won't work because we do not always need the
> >>>>>>>>>>>> defaultBranch. And without a terminal operation we don't know
> >>>> when to
> >>>>>>>>>>>> finalize and build the 'branch switch'.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do
> >>>> something
> >>>>>>>>>>>> more with the original branch after branching.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I understand your point that the need of special object
> >>>> construction
> >>>>>>>>>>>> contrasts the fluency of most KStream methods. But here we
> have a
> >>>>>>>>>>>> special case: we build the switch to split the flow, so I
> think
> >>>> this
> >>>>>>>> is
> >>>>>>>>>>>> still idiomatic.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Regards,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Ivan
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> >>>>>>>>>>>>> Ivan,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think it's a great idea to improve this API, but I find the
> >>>>>>>>>>>>> onTopOff()
> >>>>>>>>>>>>> mechanism a little confusing since it contrasts the fluency
> of
> >>>> other
> >>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call a
> method on
> >>>> the
> >>>>>>>>>>>> stream
> >>>>>>>>>>>>> so it still reads top to bottom if the branch cases are
> defined
> >>>>>>>>>>>>> fluently.
> >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice
> and the
> >>>>>>>>>>>>> right
> >>>>>>>>>>>> way
> >>>>>>>>>>>>> to do things, but what if we flipped around how we specify
> the
> >>>> source
> >>>>>>>>>>>>> stream.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Like:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> stream.branch()
> >>>>>>>>>>>>>            .addBranch(predicate1, this::handle1)
> >>>>>>>>>>>>>            .addBranch(predicate2, this::handle2)
> >>>>>>>>>>>>>            .defaultBranch(this::handleDefault);
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or KStreamBrancher
> or
> >>>>>>>>>>>> something,
> >>>>>>>>>>>>> which is added to by addBranch() and terminated by
> >>>> defaultBranch()
> >>>>>>>>>>>>> (which
> >>>>>>>>>>>>> returns void).  This is obviously incompatible with the
> current
> >>>>>>>>>>>>> API, so
> >>>>>>>>>>>> the
> >>>>>>>>>>>>> new stream.branch() would have to have a different name, but
> that
> >>>>>>>>>>>>> seems
> >>>>>>>>>>>>> like a fairly small problem - we could call it something like
> >>>>>>>>>>>>> branched()
> >>>>>>>>>>>> or
> >>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It seems
> like it
> >>>>>>>>>>>>> does to
> >>>>>>>>>>>>> me, allowing for clear in-line branching while also allowing
> you
> >>>> to
> >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams if
> desired.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Paul
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> >>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Bill,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you for your reply!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This is how I usually do it:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
> >>>>>>>>>>>>>>            ks.filter(....).mapValues(...)
> >>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
> >>>>>>>>>>>>>>            ks.selectKey(...).groupByKey()...
> >>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> ......
> >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> >>>>>>>>>>>>>>       .addBranch(predicate1, this::handleFirstCase)
> >>>>>>>>>>>>>>       .addBranch(predicate2, this::handleSecondCase)
> >>>>>>>>>>>>>>       .onTopOf(....)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Ivan
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> >>>>>>>>>>>>>>> Hi Ivan,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a
> Consumer
> >>>> as a
> >>>>>>>>>>>>>> second
> >>>>>>>>>>>>>>> argument which returns nothing, and the example in the KIP
> >>>> shows
> >>>>>>>>>>>>>>> each
> >>>>>>>>>>>>>>> stream from the branch using a terminal node
> (KafkaStreams#to()
> >>>>>>>>>>>>>>> in this
> >>>>>>>>>>>>>>> case).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Maybe I've missed something, but how would we handle the
> case
> >>>>>>>>>>>>>>> where the
> >>>>>>>>>>>>>>> user has created a branch but wants to continue processing
> and
> >>>> not
> >>>>>>>>>>>>>>> necessarily use a terminal node on the branched stream
> >>>> immediately?
> >>>>>>>>>>>>>>> For example, using today's logic as is if we had something
> like
> >>>>>>>>>>>>>>> this:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> KStream<String, String>[] branches =
> >>>>>>>>>>>>>>> originalStream.branch(predicate1,
> >>>>>>>>>>>>>>> predicate2);
> >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>> Bill
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
> bbej...@gmail.com
> >>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> All,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Here's the original message:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hello,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please take
> a
> >>>> look
> >>>>>>>> at
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> KIP-418:
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> >>>> https://issues.apache.org/jira/browse/KAFKA-5488
> >>>>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Ivan Ponomarev
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >
>
>

Reply via email to