Hi Ivan, Thanks for the update.
FWIW, I agree with Matthias that the current "start branching" operator is confusing when named the same way as the actual branches. "Split" seems like a good name. Alternatively, we can do without a "start branching" operator at all, and just do: stream .branch(Predicate) .branch(Predicate) .defaultBranch(); Tentatively, I think that this branching operation should be terminal. That way, we don't create ambiguity about how to use it. That is, `branch` should return `KBranchedStream`, while `defaultBranch` is `void`, to enforce that it comes last, and that there is only one definition of the default branch. Potentially, we should log a warning if there's no default, and additionally log a warning (or throw an exception) if a record falls though with no default. Thoughts? Thanks, -John On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for updating the KIP and your answers. > > > > this is to make the name similar to String#split > >> that also returns an array, right? > > The intend was to avoid name duplication. The return type should _not_ > be an array. > > The current proposal is > > stream.branch() > .branch(Predicate) > .branch(Predicate) > .defaultBranch(); > > IMHO, this reads a little odd, because the first `branch()` does not > take any parameters and has different semantics than the later > `branch()` calls. Note, that from the code snippet above, it's hidden > that the first call is `KStream#branch()` while the others are > `KBranchedStream#branch()` what makes reading the code harder. > > Because I suggested to rename `addBranch()` -> `branch()`, I though it > might be better to also rename `KStream#branch()` to avoid the naming > overlap that seems to be confusing. The following reads much cleaner to me: > > stream.split() > .branch(Predicate) > .branch(Predicate) > .defaultBranch(); > > Maybe there is a better alternative to `split()` though to avoid the > naming overlap. > > > > 'default' is, however, a reserved word, so unfortunately we cannot have > a method with such name :-) > > Bummer. Didn't consider this. Maybe we can still come up with a short name? > > > Can you add the interface `KBranchedStream` to the KIP with all it's > methods? It will be part of public API and should be contained in the > KIP. For example, it's unclear atm, what the return type of > `defaultBranch()` is. > > > You did not comment on the idea to add a `KBranchedStream#get(int index) > -> KStream` method to get the individually branched-KStreams. Would be > nice to get your feedback about it. It seems you suggest that users > would need to write custom utility code otherwise, to access them. We > should discuss the pros and cons of both approaches. It feels > "incomplete" to me atm, if the API has no built-in support to get the > branched-KStreams directly. > > > > -Matthias > > > On 4/13/19 2:13 AM, Ivan Ponomarev wrote: > > Hi all! > > > > I have updated the KIP-418 according to the new vision. > > > > Matthias, thanks for your comment! > > > >> Renaming KStream#branch() -> #split() > > > > I can see your point: this is to make the name similar to String#split > > that also returns an array, right? But is it worth the loss of backwards > > compatibility? We can have overloaded branch() as well without affecting > > the existing code. Maybe the old array-based `branch` method should be > > deprecated, but this is a subject for discussion. > > > >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), > > KBranchedStream#defaultBranch() -> BranchingKStream#default() > > > > Totally agree with 'addBranch->branch' rename. 'default' is, however, a > > reserved word, so unfortunately we cannot have a method with such name > :-) > > > >> defaultBranch() does take an `Predicate` as argument, but I think that > > is not required? > > > > Absolutely! I think that was just copy-paste error or something. > > > > Dear colleagues, > > > > please revise the new version of the KIP and Paul's PR > > (https://github.com/apache/kafka/pull/6512) > > > > Any new suggestions/objections? > > > > Regards, > > > > Ivan > > > > > > 11.04.2019 11:47, Matthias J. Sax пишет: > >> Thanks for driving the discussion of this KIP. It seems that everybody > >> agrees that the current branch() method using arrays is not optimal. > >> > >> I had a quick look into the PR and I like the overall proposal. There > >> are some minor things we need to consider. I would recommend the > >> following renaming: > >> > >> KStream#branch() -> #split() > >> KBranchedStream#addBranch() -> BranchingKStream#branch() > >> KBranchedStream#defaultBranch() -> BranchingKStream#default() > >> > >> It's just a suggestion to get slightly shorter method names. > >> > >> In the current PR, defaultBranch() does take an `Predicate` as argument, > >> but I think that is not required? > >> > >> Also, we should consider KIP-307, that was recently accepted and is > >> currently implemented: > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > >> > >> Ie, we should add overloads that accepted a `Named` parameter. > >> > >> > >> For the issue that the created `KStream` object are in different scopes: > >> could we extend `KBranchedStream` with a `get(int index)` method that > >> returns the corresponding "branched" result `KStream` object? Maybe, the > >> second argument of `addBranch()` should not be a `Consumer<KStream>` but > >> a `Function<KStream,KStream>` and `get()` could return whatever the > >> `Function` returns? > >> > >> > >> Finally, I would also suggest to update the KIP with the current > >> proposal. That makes it easier to review. > >> > >> > >> -Matthias > >> > >> > >> > >> On 3/31/19 12:22 PM, Paul Whalen wrote: > >>> Ivan, > >>> > >>> I'm a bit of a novice here as well, but I think it makes sense for you > to > >>> revise the KIP and continue the discussion. Obviously we'll need some > >>> buy-in from committers that have actual binding votes on whether the > KIP > >>> could be adopted. It would be great to hear if they think this is a > good > >>> idea overall. I'm not sure if that happens just by starting a vote, > or if > >>> there is generally some indication of interest beforehand. > >>> > >>> That being said, I'll continue the discussion a bit: assuming we do > move > >>> forward the solution of "stream.branch() returns KBranchedStream", do > we > >>> deprecate "stream.branch(...) returns KStream[]"? I would favor > >>> deprecating, since having two mutually exclusive APIs that accomplish > the > >>> same thing is confusing, especially when they're fairly similar > anyway. We > >>> just need to be sure we're not making something impossible/difficult > that > >>> is currently possible/easy. > >>> > >>> Regarding my PR - I think the general structure would work, it's just a > >>> little sloppy overall in terms of naming and clarity. In particular, > >>> passing in the "predicates" and "children" lists which get modified in > >>> KBranchedStream but read from all the way KStreamLazyBranch is a bit > >>> complicated to follow. > >>> > >>> Thanks, > >>> Paul > >>> > >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru> > wrote: > >>> > >>>> Hi Paul! > >>>> > >>>> I read your code carefully and now I am fully convinced: your proposal > >>>> looks better and should work. We just have to document the crucial > fact > >>>> that KStream consumers are invoked as they're added. And then it's all > >>>> going to be very nice. > >>>> > >>>> What shall we do now? I should re-write the KIP and resume the > >>>> discussion here, right? > >>>> > >>>> Why are you telling that your PR 'should not be even a starting point > if > >>>> we go in this direction'? To me it looks like a good starting point. > But > >>>> as a novice in this project I might miss some important details. > >>>> > >>>> Regards, > >>>> > >>>> Ivan > >>>> > >>>> > >>>> 28.03.2019 17:38, Paul Whalen пишет: > >>>>> Ivan, > >>>>> > >>>>> Maybe I’m missing the point, but I believe the stream.branch() > solution > >>>> supports this. The couponIssuer::set* consumers will be invoked as > they’re > >>>> added, not during streamsBuilder.build(). So the user still ought to > be > >>>> able to call couponIssuer.coupons() afterward and depend on the > branched > >>>> streams having been set. > >>>>> The issue I mean to point out is that it is hard to access the > branched > >>>> streams in the same scope as the original stream (that is, not inside > the > >>>> couponIssuer), which is a problem with both proposed solutions. It > can be > >>>> worked around though. > >>>>> [Also, great to hear additional interest in 401, I’m excited to hear > >>>> your thoughts!] > >>>>> Paul > >>>>> > >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru> > wrote: > >>>>>> > >>>>>> Hi Paul! > >>>>>> > >>>>>> The idea to postpone the wiring of branches to the > >>>> streamsBuilder.build() also looked great for me at first glance, but > --- > >>>>>>> the newly branched streams are not available in the same scope as > each > >>>> other. That is, if we wanted to merge them back together again I > don't see > >>>> a way to do that. > >>>>>> You just took the words right out of my mouth, I was just going to > >>>> write in details about this issue. > >>>>>> Consider the example from Bill's book, p. 101: say we need to > identify > >>>> customers who have bought coffee and made a purchase in the > electronics > >>>> store to give them coupons. > >>>>>> This is the code I usually write under these circumstances using my > >>>> 'brancher' class: > >>>>>> @Setter > >>>>>> class CouponIssuer{ > >>>>>> private KStream<....> coffePurchases; > >>>>>> private KStream<....> electronicsPurchases; > >>>>>> > >>>>>> KStream<...> coupons(){ > >>>>>> return > coffePurchases.join(electronicsPurchases...)...whatever > >>>>>> > >>>>>> /*In the real world the code here can be complex, so > creation of > >>>> a separate CouponIssuer class is fully justified, in order to separate > >>>> classes' responsibilities.*/ > >>>>>> } > >>>>>> } > >>>>>> > >>>>>> CouponIssuer couponIssuer = new CouponIssuer(); > >>>>>> > >>>>>> new KafkaStreamsBrancher<....>() > >>>>>> .branch(predicate1, couponIssuer::setCoffePurchases) > >>>>>> .branch(predicate2, couponIssuer::setElectronicsPurchases) > >>>>>> .onTopOf(transactionStream); > >>>>>> > >>>>>> /*Alas, this won't work if we're going to wire up everything later, > >>>> without the terminal operation!!!*/ > >>>>>> couponIssuer.coupons()... > >>>>>> > >>>>>> Does this make sense? In order to properly initialize the > CouponIssuer > >>>> we need the terminal operation to be called before > streamsBuilder.build() > >>>> is called. > >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially the > next > >>>> KIP I was going to write here. I have some thoughts based on my > experience, > >>>> so I will join the discussion on KIP-401 soon.] > >>>>>> Regards, > >>>>>> > >>>>>> Ivan > >>>>>> > >>>>>> 28.03.2019 6:29, Paul Whalen пишет: > >>>>>>> Ivan, > >>>>>>> I tried to make a very rough proof of concept of a fluent API based > >>>> off of > >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), and I > think > >>>> I > >>>>>>> succeeded at removing both cons. > >>>>>>> - Compatibility: I was incorrect earlier about compatibility > >>>> issues, > >>>>>>> there aren't any direct ones. I was unaware that Java is smart > >>>> enough to > >>>>>>> distinguish between a branch(varargs...) returning one thing > and > >>>> branch() > >>>>>>> with no arguments returning another thing. > >>>>>>> - Requiring a terminal method: We don't actually need it. We > can > >>>> just > >>>>>>> build up the branches in the KBranchedStream who shares its > state > >>>> with the > >>>>>>> ProcessorSupplier that will actually do the branching. It's > not > >>>> terribly > >>>>>>> pretty in its current form, but I think it demonstrates its > >>>> feasibility. > >>>>>>> To be clear, I don't think that pull request should be final or > even a > >>>>>>> starting point if we go in this direction, I just wanted to see how > >>>>>>> challenging it would be to get the API working. > >>>>>>> I will say though, that I'm not sure the existing solution could be > >>>>>>> deprecated in favor of this, which I had originally suggested was a > >>>>>>> possibility. The reason is that the newly branched streams are not > >>>>>>> available in the same scope as each other. That is, if we wanted > to > >>>> merge > >>>>>>> them back together again I don't see a way to do that. The KIP > >>>> proposal > >>>>>>> has the same issue, though - all this means is that for either > >>>> solution, > >>>>>>> deprecating the existing branch(...) is not on the table. > >>>>>>> Thanks, > >>>>>>> Paul > >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < > iponoma...@mail.ru> > >>>> wrote: > >>>>>>>> OK, let me summarize what we have discussed up to this point. > >>>>>>>> > >>>>>>>> First, it seems that it's commonly agreed that branch API needs > >>>>>>>> improvement. Motivation is given in the KIP. > >>>>>>>> > >>>>>>>> There are two potential ways to do it: > >>>>>>>> > >>>>>>>> 1. (as origianlly proposed) > >>>>>>>> > >>>>>>>> new KafkaStreamsBrancher<..>() > >>>>>>>> .branch(predicate1, ks ->..) > >>>>>>>> .branch(predicate2, ks->..) > >>>>>>>> .defaultBranch(ks->..) //optional > >>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf returns its > argument > >>>>>>>> > >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make sense > >>>> until > >>>>>>>> all the necessary ingredients are provided. > >>>>>>>> > >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance > contrasts the > >>>>>>>> fluency of other KStream methods. > >>>>>>>> > >>>>>>>> 2. (as Paul proposes) > >>>>>>>> > >>>>>>>> stream > >>>>>>>> .branch(predicate1, ks ->...) > >>>>>>>> .branch(predicate2, ks->...) > >>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both > defaultBranch(..) > >>>> and > >>>>>>>> noDefault() return void > >>>>>>>> > >>>>>>>> PROS: Generally follows the way KStreams interface is defined. > >>>>>>>> > >>>>>>>> CONS: We need to define two terminal methods (defaultBranch(ks->) > and > >>>>>>>> noDefault()). And for a user it is very easy to miss the fact > that one > >>>>>>>> of the terminal methods should be called. If these methods are not > >>>>>>>> called, we can throw an exception in runtime. > >>>>>>>> > >>>>>>>> Colleagues, what are your thoughts? Can we do better? > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> > >>>>>>>> Ivan > >>>>>>>> > >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: > >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: > >>>>>>>>>> Paul, > >>>>>>>>>> > >>>>>>>>>> I see your point when you are talking about > >>>>>>>>>> stream..branch..branch...default.. > >>>>>>>>>> > >>>>>>>>>> Still, I believe that this cannot not be implemented the easy > way. > >>>>>>>>>> Maybe we all should think further. > >>>>>>>>>> > >>>>>>>>>> Let me comment on two of your ideas. > >>>>>>>>>> > >>>>>>>>>>> user could specify a terminal method that assumes nothing will > >>>> reach > >>>>>>>>>>> the default branch, > >>>>>>>>>> throwing an exception if such a case occurs. > >>>>>>>>>> > >>>>>>>>>> 1) OK, apparently this should not be the only option besides > >>>>>>>>>> `default`, because there are scenarios when we want to just > silently > >>>>>>>>>> drop the messages that didn't match any predicate. 2) Throwing > an > >>>>>>>>>> exception in the middle of data flow processing looks like a bad > >>>> idea. > >>>>>>>>>> In stream processing paradigm, I would prefer to emit a special > >>>>>>>>>> message to a dedicated stream. This is exactly where `default` > can > >>>> be > >>>>>>>>>> used. > >>>>>>>>>> > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to > track > >>>>>>>>>>> dangling > >>>>>>>>>> branches that haven't been terminated and raise a clear error > >>>> before it > >>>>>>>>>> becomes an issue. > >>>>>>>>>> > >>>>>>>>>> You mean a runtime exception, when the program is compiled and > run? > >>>>>>>>>> Well, I'd prefer an API that simply won't compile if used > >>>>>>>>>> incorrectly. Can we build such an API as a method chain starting > >>>> from > >>>>>>>>>> KStream object? There is a huge cost difference between runtime > and > >>>>>>>>>> compile-time errors. Even if a failure uncovers instantly on > unit > >>>>>>>>>> tests, it costs more for the project than a compilation failure. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> > >>>>>>>>>> Ivan > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: > >>>>>>>>>>> Ivan, > >>>>>>>>>>> > >>>>>>>>>>> Good point about the terminal operation being required. But is > >>>> that > >>>>>>>>>>> really > >>>>>>>>>>> such a bad thing? If the user doesn't want a defaultBranch > they > >>>> can > >>>>>>>>>>> call > >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as > easily. In > >>>>>>>>>>> fact I > >>>>>>>>>>> think it creates an opportunity for a nicer API - a user could > >>>> specify > >>>>>>>> a > >>>>>>>>>>> terminal method that assumes nothing will reach the default > branch, > >>>>>>>>>>> throwing an exception if such a case occurs. That seems like > an > >>>>>>>>>>> improvement over the current branch() API, which allows for the > >>>> more > >>>>>>>>>>> subtle > >>>>>>>>>>> behavior of records unexpectedly getting dropped. > >>>>>>>>>>> > >>>>>>>>>>> The need for a terminal operation certainly has to be well > >>>>>>>>>>> documented, but > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to > track > >>>>>>>>>>> dangling > >>>>>>>>>>> branches that haven't been terminated and raise a clear error > >>>> before it > >>>>>>>>>>> becomes an issue. Especially now that there is a "build step" > >>>> where > >>>>>>>> the > >>>>>>>>>>> topology is actually wired up, when StreamsBuilder.build() is > >>>> called. > >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree that it's > >>>>>>>>>>> critical to > >>>>>>>>>>> allow users to do other operations on the input stream. With > the > >>>>>>>> fluent > >>>>>>>>>>> solution, it ought to work the same way all other operations > do - > >>>> if > >>>>>>>> you > >>>>>>>>>>> want to process off the original KStream multiple times, you > just > >>>>>>>>>>> need the > >>>>>>>>>>> stream as a variable so you can call as many operations on it > as > >>>> you > >>>>>>>>>>> desire. > >>>>>>>>>>> > >>>>>>>>>>> Thoughts? > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Paul > >>>>>>>>>>> > >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev < > iponoma...@mail.ru > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hello Paul, > >>>>>>>>>>>> > >>>>>>>>>>>> I afraid this won't work because we do not always need the > >>>>>>>>>>>> defaultBranch. And without a terminal operation we don't know > >>>> when to > >>>>>>>>>>>> finalize and build the 'branch switch'. > >>>>>>>>>>>> > >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do > >>>> something > >>>>>>>>>>>> more with the original branch after branching. > >>>>>>>>>>>> > >>>>>>>>>>>> I understand your point that the need of special object > >>>> construction > >>>>>>>>>>>> contrasts the fluency of most KStream methods. But here we > have a > >>>>>>>>>>>> special case: we build the switch to split the flow, so I > think > >>>> this > >>>>>>>> is > >>>>>>>>>>>> still idiomatic. > >>>>>>>>>>>> > >>>>>>>>>>>> Regards, > >>>>>>>>>>>> > >>>>>>>>>>>> Ivan > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: > >>>>>>>>>>>>> Ivan, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think it's a great idea to improve this API, but I find the > >>>>>>>>>>>>> onTopOff() > >>>>>>>>>>>>> mechanism a little confusing since it contrasts the fluency > of > >>>> other > >>>>>>>>>>>>> KStream method calls. Ideally I'd like to just call a > method on > >>>> the > >>>>>>>>>>>> stream > >>>>>>>>>>>>> so it still reads top to bottom if the branch cases are > defined > >>>>>>>>>>>>> fluently. > >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice > and the > >>>>>>>>>>>>> right > >>>>>>>>>>>> way > >>>>>>>>>>>>> to do things, but what if we flipped around how we specify > the > >>>> source > >>>>>>>>>>>>> stream. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Like: > >>>>>>>>>>>>> > >>>>>>>>>>>>> stream.branch() > >>>>>>>>>>>>> .addBranch(predicate1, this::handle1) > >>>>>>>>>>>>> .addBranch(predicate2, this::handle2) > >>>>>>>>>>>>> .defaultBranch(this::handleDefault); > >>>>>>>>>>>>> > >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or KStreamBrancher > or > >>>>>>>>>>>> something, > >>>>>>>>>>>>> which is added to by addBranch() and terminated by > >>>> defaultBranch() > >>>>>>>>>>>>> (which > >>>>>>>>>>>>> returns void). This is obviously incompatible with the > current > >>>>>>>>>>>>> API, so > >>>>>>>>>>>> the > >>>>>>>>>>>>> new stream.branch() would have to have a different name, but > that > >>>>>>>>>>>>> seems > >>>>>>>>>>>>> like a fairly small problem - we could call it something like > >>>>>>>>>>>>> branched() > >>>>>>>>>>>> or > >>>>>>>>>>>>> branchedStreams() and deprecate the old API. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Does this satisfy the motivations of your KIP? It seems > like it > >>>>>>>>>>>>> does to > >>>>>>>>>>>>> me, allowing for clear in-line branching while also allowing > you > >>>> to > >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams if > desired. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Paul > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev > >>>>>>>>>>>> <iponoma...@mail.ru.invalid> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Bill, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thank you for your reply! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This is how I usually do it: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){ > >>>>>>>>>>>>>> ks.filter(....).mapValues(...) > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){ > >>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... > >>>>>>>>>>>>>> } > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> ...... > >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() > >>>>>>>>>>>>>> .addBranch(predicate1, this::handleFirstCase) > >>>>>>>>>>>>>> .addBranch(predicate2, this::handleSecondCase) > >>>>>>>>>>>>>> .onTopOf(....) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Ivan > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: > >>>>>>>>>>>>>>> Hi Ivan, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a > Consumer > >>>> as a > >>>>>>>>>>>>>> second > >>>>>>>>>>>>>>> argument which returns nothing, and the example in the KIP > >>>> shows > >>>>>>>>>>>>>>> each > >>>>>>>>>>>>>>> stream from the branch using a terminal node > (KafkaStreams#to() > >>>>>>>>>>>>>>> in this > >>>>>>>>>>>>>>> case). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Maybe I've missed something, but how would we handle the > case > >>>>>>>>>>>>>>> where the > >>>>>>>>>>>>>>> user has created a branch but wants to continue processing > and > >>>> not > >>>>>>>>>>>>>>> necessarily use a terminal node on the branched stream > >>>> immediately? > >>>>>>>>>>>>>>> For example, using today's logic as is if we had something > like > >>>>>>>>>>>>>>> this: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> KStream<String, String>[] branches = > >>>>>>>>>>>>>>> originalStream.branch(predicate1, > >>>>>>>>>>>>>>> predicate2); > >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. > >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>> Bill > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck < > bbej...@gmail.com > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> All, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Here's the original message: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hello, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please take > a > >>>> look > >>>>>>>> at > >>>>>>>>>>>> the > >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :) > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> KIP-418: > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > >>>>>>>>>>>>>>>> JIRA KAFKA-5488: > >>>> https://issues.apache.org/jira/browse/KAFKA-5488 > >>>>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164 > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Regards, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Ivan Ponomarev > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > > > >