Hello everyone,

will someone please take a look at the reworked KIP?

I believe that now it follows design principles and takes into account all the arguments discussed here.


Regards,

Ivan


23.04.2020 2:45, Ivan Ponomarev пишет:
Hi,

I have read the John's "DSL design principles" and have completely rewritten the KIP, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream


This version includes all the previous discussion results and follows the design principles, with one exception.

The exception is

branch(Predicate<K,V> predicate, Branched<K,V> branched)

which formally violates 'no more than one parameter' rule, but I think here it is justified.

We must provide a predicate for a branch and don't need to provide one for the default branch. Thus for both operations we may use a single Branched parameter class, with an extra method parameter for `branch`.

Since predicate is a natural, necessary part of a branch, no 'proliferation of overloads, deprecations, etc.' is expected here as it is said in the rationale for the 'single parameter rule'.

WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:
Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:
Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference<KStream<...>> result = new AtomicReference<>();
new KafkaStreamBrancher<....>()
     .branch(....)
     .defaultBranch(result::set)
     .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer<KStream>)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer<KStream>)
   -> Map<String,KStream>
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function<KStream,KStream>, String)
   -> Map<String,KStream>
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function<KStream,KStream>)
   -> Map<String,KStream>
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map<String,KStream>

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:
I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:
Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:
Thanks for the input John!

under your suggestion, it seems that the name is required

If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I wanted to sketch out a high level proposal to merge both patterns
only. Your suggestions to align the new API with the existing API make
totally sense.



One follow up question: Would `Named` be optional or required in
`split()` and `branch()`? It's unclear from your example.

If both are mandatory, what do we gain by it? The returned `Map` only
contains the corresponding branches, so why should we prefix all of
them? If only `Named` is mandatory in `branch()`, but optional in
`split()`, the same question raises?

Requiring `Named` in `split()` seems only to make sense, if `Named` is optional in `branch()` and we generate `-X` suffix using a counter for
different branch name. However, this might lead to the problem of
changing names if branches are added/removed. Also, how would the names
be generated if `Consumer` is mixed in (ie, not all branches are
returned in the `Map`).

If `Named` is optional for both, it could happen that a user misses to
specify a name for a branch what would lead to runtime issues.


Hence, I am actually in favor to not allow a default name but keep
`split()` without parameter and make `Named` in `branch()` required
if a
`Function` is used. This makes it explicit to the user that
specifying a
name is required if a `Function` is used.



About

KBranchedStream#branch(BranchConfig)

I don't think that the branching predicate is a configuration and hence
would not include it in a configuration object.

      withChain(...);

Similar, `withChain()` (that would only take a `Consumer`?) does not
seem to be a configuration. We can also not prevent a user to call
`withName()` in combination of `withChain()` what does not make sense
IMHO. We could of course throw an RTE but not have a compile time check seems less appealing. Also, it could happen that neither `withChain()`
not `withName()` is called and the branch is missing in the returned
`Map` what lead to runtime issues, too.

Hence, I don't think that we should add `BranchConfig`. A config object
is helpful if each configuration can be set independently of all
others,
but this seems not to be the case here. If we add new configuration
later, we can also just move forward by deprecating the methods that
accept `Named` and add new methods that accepted `BranchConfig` (that
would of course implement `Named`).


Thoughts?


@Ivan, what do you think about the general idea to blend the two main
approaches of returning a `Map` plus an "embedded chaining"?



-Matthias



On 6/4/19 10:33 AM, John Roesler wrote:
Thanks for the idea, Matthias, it does seem like this would satisfy
everyone. Returning the map from the terminal operations also solves
the problem of merging/joining the branched streams, if we want to add
support for the compliment later on.

Under your suggestion, it seems that the name is required. Otherwise,
we wouldn't have keys for the map to return. I this this is actually
not too bad, since experience has taught us that, although names for
operations are not required to define stream processing logic, it does significantly improve the operational experience when you can map the
topology, logs, metrics, etc. back to the source code. Since you
wouldn't (have to) reference the name to chain extra processing onto
the branch (thanks to the second argument), you can avoid the
"unchecked name" problem that Ivan pointed out.

In the current implementation of Branch, you can name the branch
operator itself, and then all the branches get index-suffixed names
built from the branch operator name. I guess under this proposal, we
could naturally append the branch name to the branching operator name,
like this:

     stream.split(Named.withName("mysplit")) //creates node "mysplit"
                .branch(..., ..., "abranch") // creates node
"mysplit-abranch"
                .defaultBranch(...) // creates node "mysplit-default"

It does make me wonder about the DSL syntax itself, though.

We don't have a defined grammar, so there's plenty of room to debate
the "best" syntax in the context of each operation, but in general,
the KStream DSL operators follow this pattern:

      operator(function, config_object?) OR operator(config_object)

where config_object is often just Named in the "function" variant.
Even when the config_object isn't a Named, but some other config
class, that config class _always_ implements NamedOperation.

Here, we're introducing a totally different pattern:

    operator(function, function, string)

where the string is the name.
My first question is whether the name should instead be specified with
the NamedOperation interface.

My second question is whether we should just roll all these arguments
up into a config object like:

     KBranchedStream#branch(BranchConfig)

     interface BranchConfig extends NamedOperation {
      withPredicate(...);
      withChain(...);
      withName(...);
    }

Although I guess we'd like to call BranchConfig something more like
"Branched", even if I don't particularly like that pattern.

This makes the source code a little noisier, but it also makes us more future-proof, as we can deal with a wide range of alternatives purely in the config interface, and never have to deal with adding overloads
to the KBranchedStream if/when we decide we want the name to be
optional, or the KStream->KStream to be optional.

WDYT?

Thanks,
-John

On Fri, May 24, 2019 at 5:25 PM Michael Drogalis
<michael.droga...@confluent.io> wrote:

Matthias: I think that's pretty reasonable from my point of view.
Good
suggestion.

On Thu, May 23, 2019 at 9:50 PM Matthias J. Sax
<matth...@confluent.io>
wrote:

Interesting discussion.

I am wondering, if we cannot unify the advantage of both approaches:



KStream#split() -> KBranchedStream

// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer<KStream>)
    -> KBranchedStream

// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function<KStream,KStream>, String)
    -> KBranchedStream

// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer<KStream>)
    -> Map<String,KStream>

// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function<KStream,KStream>, String)
    -> Map<String,KStream>

// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function<KStream,KStream>)
    -> Map<String,KStream>

// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
    -> Map<String,KStream>



Hence, for each sub-stream, the user can pick to add a name and
return
the branch "result" to the calling scope or not. The
implementation can
also check at runtime that all returned names are unique. The
returned
Map can be empty and it's also optional to use the Map.

To me, it seems like a good way to get best of both worlds.

Thoughts?



-Matthias




On 5/6/19 5:15 PM, John Roesler wrote:
Ivan,

That's a very good point about the "start" operator in the
dynamic case.
I had no problem with "split()"; I was just questioning the
necessity.
Since you've provided a proof of necessity, I'm in favor of the
"split()" start operator. Thanks!

Separately, I'm interested to see where the present discussion
leads.
I've written enough Javascript code in my life to be suspicious of
nested closures. You have a good point about using method
references (or
indeed function literals also work). It should be validating
that this
was also the JS community's first approach to flattening the
logic when
their nested closure situation got out of hand. Unfortunately, it's
replacing nesting with redirection, both of which disrupt code
readability (but in different ways for different reasons). In other
words, I agree that function references is *the* first-order
solution if
the nested code does indeed become a problem.

However, the history of JS also tells us that function
references aren't
the end of the story either, and you can see that by observing that
there have been two follow-on eras, as they continue trying to
cope with
the consequences of living in such a callback-heavy language.
First, you
have Futures/Promises, which essentially let you convert nested
code to
method-chained code (Observables/FP is a popular variation on
this).
Most lately, you have async/await, which is an effort to apply
language
(not just API) syntax to the problem, and offer the "flattest"
possible
programming style to solve the problem (because you get back to
just one
code block per functional unit).

Stream-processing is a different domain, and Java+KStreams is
nowhere
near as callback heavy as JS, so I don't think we have to take
the JS
story for granted, but then again, I think we can derive some
valuable
lessons by looking sideways to adjacent domains. I'm just
bringing this
up to inspire further/deeper discussion. At the same time, just
like JS,
we can afford to take an iterative approach to the problem.

Separately again, I'm interested in the post-branch merge (and
I'd also
add join) problem that Paul brought up. We can clearly punt on
it, by
terminating the nested branches with sink operators. But is
there a DSL
way to do it?

Thanks again for your driving this,
-John

On Thu, May 2, 2019 at 7:39 PM Paul Whalen <pgwha...@gmail.com
<mailto:pgwha...@gmail.com>> wrote:

      Ivan, I’ll definitely forfeit my point on the clumsiness of
the
      branch(predicate, consumer) solution, I don’t see any real
drawbacks
      for the dynamic case.

      IMO the one trade off to consider at this point is the scope
      question. I don’t know if I totally agree that “we rarely
need them
      in the same scope” since merging the branches back together
later
      seems like a perfectly plausible use case that can be a lot
nicer
      when the branched streams are in the same scope. That being
said,
      for the reasons Ivan listed, I think it is overall the better
      solution - working around the scope thing is easy enough if
you need
      to.

      > On May 2, 2019, at 7:00 PM, Ivan Ponomarev
      <iponoma...@mail.ru.invalid> wrote:
      >
      > Hello everyone, thank you all for joining the discussion!
      >
      > Well, I don't think the idea of named branches, be it a
      LinkedHashMap (no other Map will do, because order of
definition
      matters) or `branch` method  taking name and Consumer has more
      advantages than drawbacks.
      >
      > In my opinion, the only real positive outcome from Michael's
      proposal is that all the returned branches are in the same
scope.
      But 1) we rarely need them in the same scope 2) there is a
      workaround for the scope problem, described in the KIP.
      >
      > 'Inlining the complex logic' is not a problem, because we
can use
      method references instead of lambdas. In real world
scenarios you
      tend to split the complex logic to methods anyway, so the
code is
      going to be clean.
      >
      > The drawbacks are strong. The cohesion between predicates
and
      handlers is lost. We have to define predicates in one
place, and
      handlers in another. This opens the door for bugs:
      >
      > - what if we forget to define a handler for a name? or a
name for
      a handler?
      > - what if we misspell a name?
      > - what if we copy-paste and duplicate a name?
      >
      > What Michael propose would have been totally OK if we had
been
      writing the API in Lua, Ruby or Python. In those languages the       "dynamic naming" approach would have looked most concise and       beautiful. But in Java we expect all the problems related to
      identifiers to be eliminated in compile time.
      >
      > Do we have to invent duck-typing for the Java API?
      >
      > And if we do, what advantage are we supposed to get
besides having
      all the branches in the same scope? Michael, maybe I'm
missing your
      point?
      >
      > ---
      >
      > Earlier in this discussion John Roesler also proposed to do
      without "start branching" operator, and later Paul
mentioned that in
      the case when we have to add a dynamic number of branches, the
      current KIP is 'clumsier' compared to Michael's 'Map'
solution. Let
      me address both comments here.
      >
      > 1) "Start branching" operator (I think that *split* is a
good name
      for it indeed) is critical when we need to do a dynamic
branching,
      see example below.
      >
      > 2) No, dynamic branching in current KIP is not clumsy at
all.
      Imagine a real-world scenario when you need one branch per
enum
      value (say, RecordType). You can have something like this:
      >
      > /*John:if we had to start with stream.branch(...) here,
it would
      have been much messier.*/
      > KBranchedStream branched = stream.split();
      >
      > /*Not clumsy at all :-)*/
      > for (RecordType recordType : RecordType.values())
      >             branched = branched.branch((k, v) ->
v.getRecType() ==
      recordType,
      >                     recordType::processRecords);
      >
      > Regards,
      >
      > Ivan
      >
      >
      > 02.05.2019 14:40, Matthias J. Sax пишет:
      >> I also agree with Michael's observation about the core
problem of
      >> current `branch()` implementation.
      >>
      >> However, I also don't like to pass in a clumsy Map
object. My
      thinking
      >> was more aligned with Paul's proposal to just add a name
to each
      >> `branch()` statement and return a `Map<String,KStream>`.
      >>
      >> It makes the code easier to read, and also make the
order of
      >> `Predicates` (that is essential) easier to grasp.
      >>
      >>>>>> Map<String, KStream<K, V>> branches = stream.split()
      >>>>>>    .branch("branchOne", Predicate<K, V>)
      >>>>>>    .branch( "branchTwo", Predicate<K, V>)
      >>>>>>    .defaultBranch("defaultBranch");
      >> An open question is the case for which no
defaultBranch() should
be
      >> specified. Atm, `split()` and `branch()` would return
      `BranchedKStream`
      >> and the call to `defaultBranch()` that returns the `Map` is
mandatory
      >> (what is not the case atm). Or is this actually not a real
problem,
      >> because users can just ignore the branch returned by
      `defaultBranch()`
      >> in the result `Map` ?
      >>
      >>
      >> About "inlining": So far, it seems to be a matter of
personal
      >> preference. I can see arguments for both, but no "killer
      argument" yet
      >> that clearly make the case for one or the other.
      >>
      >>
      >> -Matthias
      >>
      >>> On 5/1/19 6:26 PM, Paul Whalen wrote:
      >>> Perhaps inlining is the wrong terminology. It doesn’t
require
      that a lambda with the full downstream topology be defined
inline -
      it can be a method reference as with Ivan’s original
suggestion.
      The advantage of putting the predicate and its downstream
logic
      (Consumer) together in branch() is that they are required
to be near
      to each other.
      >>>
      >>> Ultimately the downstream code has to live somewhere,
and deep
      branch trees will be hard to read regardless.
      >>>
      >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis
      <michael.droga...@confluent.io
      <mailto:michael.droga...@confluent.io>> wrote:
      >>>>
      >>>> I'm less enthusiastic about inlining the branch logic
with its
      downstream
      >>>> functionality. Programs that have deep branch trees will
      quickly become
      >>>> harder to read as a single unit.
      >>>>
      >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen
      <pgwha...@gmail.com <mailto:pgwha...@gmail.com>> wrote:
      >>>>>
      >>>>> Also +1 on the issues/goals as Michael outlined them,
I think
      that sets a
      >>>>> great framework for the discussion.
      >>>>>
      >>>>> Regarding the SortedMap solution, my understanding is
that the
      current
      >>>>> proposal in the KIP is what is in my PR which
(pending naming
      decisions) is
      >>>>> roughly this:
      >>>>>
      >>>>> stream.split()
      >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
      >>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
      >>>>>    .defaultBranch(Consumer<KStream<K, V>>);
      >>>>>
      >>>>> Obviously some ordering is necessary, since branching
as a
      construct
      >>>>> doesn't work without it, but this solution seems like it
      provides as much
      >>>>> associativity as the SortedMap solution, because each
branch()
      call
      >>>>> directly associates the "conditional" with the "code
block."
      The value it
      >>>>> provides over the KIP solution is the accessing of
streams in
      the same
      >>>>> scope.
      >>>>>
      >>>>> The KIP solution is less "dynamic" than the SortedMap
solution
      in the sense
      >>>>> that it is slightly clumsier to add a dynamic number of
      branches, but it is
      >>>>> certainly possible.  It seems to me like the API
should favor
      the "static"
      >>>>> case anyway, and should make it simple and readable to
      fluently declare and
      >>>>> access your branches in-line.  It also makes it
impossible to
      ignore a
      >>>>> branch, and it is possible to build an (almost)
identical
      SortedMap
      >>>>> solution on top of it.
      >>>>>
      >>>>> I could also see a middle ground where instead of a raw
      SortedMap being
      >>>>> taken in, branch() takes a name and not a Consumer.
Something
      like this:
      >>>>>
      >>>>> Map<String, KStream<K, V>> branches = stream.split()
      >>>>>    .branch("branchOne", Predicate<K, V>)
      >>>>>    .branch( "branchTwo", Predicate<K, V>)
      >>>>>    .defaultBranch("defaultBranch",
Consumer<KStream<K, V>>);
      >>>>>
      >>>>> Pros for that solution:
      >>>>> - accessing branched KStreams in same scope
      >>>>> - no double brace initialization, hopefully slightly
more
      readable than
      >>>>> SortedMap
      >>>>>
      >>>>> Cons
      >>>>> - downstream branch logic cannot be specified inline
which
      makes it harder
      >>>>> to read top to bottom (like existing API and
SortedMap, but
      unlike the KIP)
      >>>>> - you can forget to "handle" one of the branched
streams (like
      existing
      >>>>> API and SortedMap, but unlike the KIP)
      >>>>>
      >>>>> (KBranchedStreams could even work *both* ways but
perhaps
      that's overdoing
      >>>>> it).
      >>>>>
      >>>>> Overall I'm curious how important it is to be able to
easily
      access the
      >>>>> branched KStream in the same scope as the original.
It's
      possible that it
      >>>>> doesn't need to be handled directly by the API, but
instead
      left up to the
      >>>>> user.  I'm sort of in the middle on it.
      >>>>>
      >>>>> Paul
      >>>>>
      >>>>>
      >>>>>
      >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman
      <sop...@confluent.io <mailto:sop...@confluent.io>>
      >>>>> wrote:
      >>>>>
      >>>>>> I'd like to +1 what Michael said about the issues
with the
      existing
      >>>>> branch
      >>>>>> method, I agree with what he's outlined and I think
we should
      proceed by
      >>>>>> trying to alleviate these problems. Specifically it
seems
      important to be
      >>>>>> able to cleanly access the individual branches (eg
by mapping
      >>>>>> name->stream), which I thought was the original
intention of
      this KIP.
      >>>>>>
      >>>>>> That said, I don't think we should so easily give in
to the
      double brace
      >>>>>> anti-pattern or force ours users into it if at all
possible to
      >>>>> avoid...just
      >>>>>> my two cents.
      >>>>>>
      >>>>>> Cheers,
      >>>>>> Sophie
      >>>>>>
      >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
      >>>>>> michael.droga...@confluent.io
      <mailto:michael.droga...@confluent.io>> wrote:
      >>>>>>
      >>>>>>> I’d like to propose a different way of thinking
about this.
      To me,
      >>>>> there
      >>>>>>> are three problems with the existing branch signature:
      >>>>>>>
      >>>>>>> 1. If you use it the way most people do, Java
raises unsafe
type
      >>>>>> warnings.
      >>>>>>> 2. The way in which you use the stream branches is
      positionally coupled
      >>>>>> to
      >>>>>>> the ordering of the conditionals.
      >>>>>>> 3. It is brittle to extend existing branch calls with
      additional code
      >>>>>>> paths.
      >>>>>>>
      >>>>>>> Using associative constructs instead of relying on
ordered
      constructs
      >>>>>> would
      >>>>>>> be a stronger approach. Consider a signature that
instead
      looks like
      >>>>>> this:
      >>>>>>> Map<String, KStream<K,V>>
KStream#branch(SortedMap<String,
      Predicate<?
      >>>>>>> super K,? super V>>);
      >>>>>>>
      >>>>>>> Branches are given names in a map, and as a result,
the API
      returns a
      >>>>>>> mapping of names to streams. The ordering of the
conditionals is
      >>>>>> maintained
      >>>>>>> because it’s a sorted map. Insert order determines
the order
of
      >>>>>> evaluation.
      >>>>>>> This solves problem 1 because there are no more
varargs. It
      solves
      >>>>>> problem
      >>>>>>> 2 because you no longer lean on ordering to access the
      branch you’re
      >>>>>>> interested in. It solves problem 3 because you can
introduce
      another
      >>>>>>> conditional by simply attaching another name to the
      structure, rather
      >>>>>> than
      >>>>>>> messing with the existing indices.
      >>>>>>>
      >>>>>>> One of the drawbacks is that creating the map
inline is
      historically
      >>>>>>> awkward in Java. I know it’s an anti-pattern to use
      voluminously, but
      >>>>>>> double brace initialization would clean up the
aesthetics.
      >>>>>>>
      >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler
      <j...@confluent.io <mailto:j...@confluent.io>>
      >>>>> wrote:
      >>>>>>>> Hi Ivan,
      >>>>>>>>
      >>>>>>>> Thanks for the update.
      >>>>>>>>
      >>>>>>>> FWIW, I agree with Matthias that the current "start
branching"
      >>>>> operator
      >>>>>>> is
      >>>>>>>> confusing when named the same way as the actual
branches.
      "Split"
      >>>>> seems
      >>>>>>>> like a good name. Alternatively, we can do without
a "start
      >>>>> branching"
      >>>>>>>> operator at all, and just do:
      >>>>>>>>
      >>>>>>>> stream
      >>>>>>>>      .branch(Predicate)
      >>>>>>>>      .branch(Predicate)
      >>>>>>>>      .defaultBranch();
      >>>>>>>>
      >>>>>>>> Tentatively, I think that this branching operation
should be
      >>>>> terminal.
      >>>>>>> That
      >>>>>>>> way, we don't create ambiguity about how to use
it. That
      is, `branch`
      >>>>>>>> should return `KBranchedStream`, while
`defaultBranch` is
      `void`, to
      >>>>>>>> enforce that it comes last, and that there is only
one
      definition of
      >>>>>> the
      >>>>>>>> default branch. Potentially, we should log a
warning if
      there's no
      >>>>>>> default,
      >>>>>>>> and additionally log a warning (or throw an
exception) if a
      record
      >>>>>> falls
      >>>>>>>> though with no default.
      >>>>>>>>
      >>>>>>>> Thoughts?
      >>>>>>>>
      >>>>>>>> Thanks,
      >>>>>>>> -John
      >>>>>>>>
      >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
      >>>>> matth...@confluent.io <mailto:matth...@confluent.io>
      >>>>>>>> wrote:
      >>>>>>>>
      >>>>>>>>> Thanks for updating the KIP and your answers.
      >>>>>>>>>
      >>>>>>>>>
      >>>>>>>>>> this is to make the name similar to String#split
      >>>>>>>>>>> that also returns an array, right?
      >>>>>>>>> The intend was to avoid name duplication. The
return type
      should
      >>>>>> _not_
      >>>>>>>>> be an array.
      >>>>>>>>>
      >>>>>>>>> The current proposal is
      >>>>>>>>>
      >>>>>>>>> stream.branch()
      >>>>>>>>>      .branch(Predicate)
      >>>>>>>>>      .branch(Predicate)
      >>>>>>>>>      .defaultBranch();
      >>>>>>>>>
      >>>>>>>>> IMHO, this reads a little odd, because the first
      `branch()` does
      >>>>> not
      >>>>>>>>> take any parameters and has different semantics
than the
later
      >>>>>>>>> `branch()` calls. Note, that from the code
snippet above,
it's
      >>>>> hidden
      >>>>>>>>> that the first call is `KStream#branch()` while
the others
are
      >>>>>>>>> `KBranchedStream#branch()` what makes reading the
code
harder.
      >>>>>>>>>
      >>>>>>>>> Because I suggested to rename `addBranch()` ->
`branch()`,
      I though
      >>>>>> it
      >>>>>>>>> might be better to also rename `KStream#branch()`
to avoid
the
      >>>>> naming
      >>>>>>>>> overlap that seems to be confusing. The following
reads
much
      >>>>> cleaner
      >>>>>> to
      >>>>>>>> me:
      >>>>>>>>> stream.split()
      >>>>>>>>>      .branch(Predicate)
      >>>>>>>>>      .branch(Predicate)
      >>>>>>>>>      .defaultBranch();
      >>>>>>>>>
      >>>>>>>>> Maybe there is a better alternative to `split()`
though to
      avoid
      >>>>> the
      >>>>>>>>> naming overlap.
      >>>>>>>>>
      >>>>>>>>>
      >>>>>>>>>> 'default' is, however, a reserved word, so
unfortunately
we
      >>>>> cannot
      >>>>>>> have
      >>>>>>>>> a method with such name :-)
      >>>>>>>>>
      >>>>>>>>> Bummer. Didn't consider this. Maybe we can still
come up
      with a
      >>>>> short
      >>>>>>>> name?
      >>>>>>>>>
      >>>>>>>>> Can you add the interface `KBranchedStream` to
the KIP
      with all
      >>>>> it's
      >>>>>>>>> methods? It will be part of public API and should be
      contained in
      >>>>> the
      >>>>>>>>> KIP. For example, it's unclear atm, what the
return type of
      >>>>>>>>> `defaultBranch()` is.
      >>>>>>>>>
      >>>>>>>>>
      >>>>>>>>> You did not comment on the idea to add a
      `KBranchedStream#get(int
      >>>>>>> index)
      >>>>>>>>> -> KStream` method to get the individually
      branched-KStreams. Would
      >>>>>> be
      >>>>>>>>> nice to get your feedback about it. It seems you
suggest
      that users
      >>>>>>>>> would need to write custom utility code
otherwise, to
      access them.
      >>>>> We
      >>>>>>>>> should discuss the pros and cons of both
approaches. It
feels
      >>>>>>>>> "incomplete" to me atm, if the API has no
built-in support
      to get
      >>>>> the
      >>>>>>>>> branched-KStreams directly.
      >>>>>>>>>
      >>>>>>>>>
      >>>>>>>>>
      >>>>>>>>> -Matthias
      >>>>>>>>>
      >>>>>>>>>
      >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
      >>>>>>>>>> Hi all!
      >>>>>>>>>>
      >>>>>>>>>> I have updated the KIP-418 according to the new
vision.
      >>>>>>>>>>
      >>>>>>>>>> Matthias, thanks for your comment!
      >>>>>>>>>>
      >>>>>>>>>>> Renaming KStream#branch() -> #split()
      >>>>>>>>>> I can see your point: this is to make the name
similar to
      >>>>>>> String#split
      >>>>>>>>>> that also returns an array, right? But is it
worth the
      loss of
      >>>>>>>> backwards
      >>>>>>>>>> compatibility? We can have overloaded branch()
as well
      without
      >>>>>>>> affecting
      >>>>>>>>>> the existing code. Maybe the old array-based
`branch`
method
      >>>>> should
      >>>>>>> be
      >>>>>>>>>> deprecated, but this is a subject for discussion.
      >>>>>>>>>>
      >>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
      >>>>> BranchingKStream#branch(),
      >>>>>>>>>> KBranchedStream#defaultBranch() ->
BranchingKStream#default()
      >>>>>>>>>>
      >>>>>>>>>> Totally agree with 'addBranch->branch' rename.
'default'
is,
      >>>>>>> however, a
      >>>>>>>>>> reserved word, so unfortunately we cannot have a
method
      with such
      >>>>>>> name
      >>>>>>>>> :-)
      >>>>>>>>>>> defaultBranch() does take an `Predicate` as
argument,
but I
      >>>>> think
      >>>>>>> that
      >>>>>>>>>> is not required?
      >>>>>>>>>>
      >>>>>>>>>> Absolutely! I think that was just copy-paste
error or
      something.
      >>>>>>>>>>
      >>>>>>>>>> Dear colleagues,
      >>>>>>>>>>
      >>>>>>>>>> please revise the new version of the KIP and
Paul's PR
      >>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
      >>>>>>>>>>
      >>>>>>>>>> Any new suggestions/objections?
      >>>>>>>>>>
      >>>>>>>>>> Regards,
      >>>>>>>>>>
      >>>>>>>>>> Ivan
      >>>>>>>>>>
      >>>>>>>>>>
      >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
      >>>>>>>>>>> Thanks for driving the discussion of this KIP.
It seems
that
      >>>>>>> everybody
      >>>>>>>>>>> agrees that the current branch() method using
arrays is
not
      >>>>>> optimal.
      >>>>>>>>>>> I had a quick look into the PR and I like the
overall
      proposal.
      >>>>>>> There
      >>>>>>>>>>> are some minor things we need to consider. I would
      recommend the
      >>>>>>>>>>> following renaming:
      >>>>>>>>>>>
      >>>>>>>>>>> KStream#branch() -> #split()
      >>>>>>>>>>> KBranchedStream#addBranch() ->
BranchingKStream#branch()
      >>>>>>>>>>> KBranchedStream#defaultBranch() ->
      BranchingKStream#default()
      >>>>>>>>>>>
      >>>>>>>>>>> It's just a suggestion to get slightly shorter
method
names.
      >>>>>>>>>>>
      >>>>>>>>>>> In the current PR, defaultBranch() does take an
      `Predicate` as
      >>>>>>>> argument,
      >>>>>>>>>>> but I think that is not required?
      >>>>>>>>>>>
      >>>>>>>>>>> Also, we should consider KIP-307, that was
recently
      accepted and
      >>>>>> is
      >>>>>>>>>>> currently implemented:
      >>>>>>>>>>>
      >>>>>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

      >>>>>>>>>>> Ie, we should add overloads that accepted a
`Named`
      parameter.
      >>>>>>>>>>>
      >>>>>>>>>>>
      >>>>>>>>>>> For the issue that the created `KStream` object
are in
      different
      >>>>>>>> scopes:
      >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int
      index)` method
      >>>>>>> that
      >>>>>>>>>>> returns the corresponding "branched" result
`KStream`
      object?
      >>>>>> Maybe,
      >>>>>>>> the
      >>>>>>>>>>> second argument of `addBranch()` should not be a
      >>>>>> `Consumer<KStream>`
      >>>>>>>> but
      >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could
return
      whatever
      >>>>>> the
      >>>>>>>>>>> `Function` returns?
      >>>>>>>>>>>
      >>>>>>>>>>>
      >>>>>>>>>>> Finally, I would also suggest to update the KIP
with the
      current
      >>>>>>>>>>> proposal. That makes it easier to review.
      >>>>>>>>>>>
      >>>>>>>>>>>
      >>>>>>>>>>> -Matthias
      >>>>>>>>>>>
      >>>>>>>>>>>
      >>>>>>>>>>>
      >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
      >>>>>>>>>>>> Ivan,
      >>>>>>>>>>>>
      >>>>>>>>>>>> I'm a bit of a novice here as well, but I
think it
      makes sense
      >>>>>> for
      >>>>>>>> you
      >>>>>>>>> to
      >>>>>>>>>>>> revise the KIP and continue the discussion.
Obviously
      we'll
      >>>>> need
      >>>>>>>> some
      >>>>>>>>>>>> buy-in from committers that have actual
binding votes on
      >>>>> whether
      >>>>>>> the
      >>>>>>>>> KIP
      >>>>>>>>>>>> could be adopted.  It would be great to hear
if they
      think this
      >>>>>> is
      >>>>>>> a
      >>>>>>>>> good
      >>>>>>>>>>>> idea overall.  I'm not sure if that happens
just by
      starting a
      >>>>>>> vote,
      >>>>>>>>> or if
      >>>>>>>>>>>> there is generally some indication of interest
beforehand.
      >>>>>>>>>>>>
      >>>>>>>>>>>> That being said, I'll continue the discussion
a bit:
      assuming
      >>>>> we
      >>>>>> do
      >>>>>>>>> move
      >>>>>>>>>>>> forward the solution of "stream.branch() returns
      >>>>>> KBranchedStream",
      >>>>>>> do
      >>>>>>>>> we
      >>>>>>>>>>>> deprecate "stream.branch(...) returns
KStream[]"?  I
would
      >>>>> favor
      >>>>>>>>>>>> deprecating, since having two mutually
exclusive APIs
that
      >>>>>>> accomplish
      >>>>>>>>> the
      >>>>>>>>>>>> same thing is confusing, especially when
they're fairly
      similar
      >>>>>>>>> anyway.  We
      >>>>>>>>>>>> just need to be sure we're not making something
      >>>>>>> impossible/difficult
      >>>>>>>>> that
      >>>>>>>>>>>> is currently possible/easy.
      >>>>>>>>>>>>
      >>>>>>>>>>>> Regarding my PR - I think the general
structure would
work,
      >>>>> it's
      >>>>>>>> just a
      >>>>>>>>>>>> little sloppy overall in terms of naming and
clarity. In
      >>>>>>> particular,
      >>>>>>>>>>>> passing in the "predicates" and "children"
lists which
get
      >>>>>> modified
      >>>>>>>> in
      >>>>>>>>>>>> KBranchedStream but read from all the way
      KStreamLazyBranch is
      >>>>> a
      >>>>>>> bit
      >>>>>>>>>>>> complicated to follow.
      >>>>>>>>>>>>
      >>>>>>>>>>>> Thanks,
      >>>>>>>>>>>> Paul
      >>>>>>>>>>>>
      >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
      >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
      >>>>>>>>> wrote:
      >>>>>>>>>>>>> Hi Paul!
      >>>>>>>>>>>>>
      >>>>>>>>>>>>> I read your code carefully and now I am fully
      convinced: your
      >>>>>>>> proposal
      >>>>>>>>>>>>> looks better and should work. We just have to
document
the
      >>>>>> crucial
      >>>>>>>>> fact
      >>>>>>>>>>>>> that KStream consumers are invoked as they're
added.
      And then
      >>>>>> it's
      >>>>>>>> all
      >>>>>>>>>>>>> going to be very nice.
      >>>>>>>>>>>>>
      >>>>>>>>>>>>> What shall we do now? I should re-write the
KIP and
      resume the
      >>>>>>>>>>>>> discussion here, right?
      >>>>>>>>>>>>>
      >>>>>>>>>>>>> Why are you telling that your PR 'should not
be even a
      >>>>> starting
      >>>>>>>> point
      >>>>>>>>> if
      >>>>>>>>>>>>> we go in this direction'? To me it looks like
a good
      starting
      >>>>>>> point.
      >>>>>>>>> But
      >>>>>>>>>>>>> as a novice in this project I might miss some
important
      >>>>> details.
      >>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>
      >>>>>>>>>>>>> Ivan
      >>>>>>>>>>>>>
      >>>>>>>>>>>>>
      >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
      >>>>>>>>>>>>>> Ivan,
      >>>>>>>>>>>>>>
      >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
      >>>>> stream.branch()
      >>>>>>>>> solution
      >>>>>>>>>>>>> supports this. The couponIssuer::set*
consumers will be
      >>>>> invoked
      >>>>>> as
      >>>>>>>>> they’re
      >>>>>>>>>>>>> added, not during streamsBuilder.build(). So
the user
      still
      >>>>>> ought
      >>>>>>> to
      >>>>>>>>> be
      >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward
and
      depend on
      >>>>> the
      >>>>>>>>> branched
      >>>>>>>>>>>>> streams having been set.
      >>>>>>>>>>>>>> The issue I mean to point out is that it is
hard to
      access
      >>>>> the
      >>>>>>>>> branched
      >>>>>>>>>>>>> streams in the same scope as the original
stream (that
      is, not
      >>>>>>>> inside
      >>>>>>>>> the
      >>>>>>>>>>>>> couponIssuer), which is a problem with both
proposed
      >>>>> solutions.
      >>>>>> It
      >>>>>>>>> can be
      >>>>>>>>>>>>> worked around though.
      >>>>>>>>>>>>>> [Also, great to hear additional interest in
401, I’m
      excited
      >>>>> to
      >>>>>>>> hear
      >>>>>>>>>>>>> your thoughts!]
      >>>>>>>>>>>>>> Paul
      >>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
      >>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
      >>>>>>>>> wrote:
      >>>>>>>>>>>>>>> Hi Paul!
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> The idea to postpone the wiring of branches
to the
      >>>>>>>>>>>>> streamsBuilder.build() also looked great for
me at
first
      >>>>> glance,
      >>>>>>> but
      >>>>>>>>> ---
      >>>>>>>>>>>>>>>> the newly branched streams are not
available in the
      same
      >>>>>> scope
      >>>>>>> as
      >>>>>>>>> each
      >>>>>>>>>>>>> other.  That is, if we wanted to merge them back
together
      >>>>> again
      >>>>>> I
      >>>>>>>>> don't see
      >>>>>>>>>>>>> a way to do that.
      >>>>>>>>>>>>>>> You just took the words right out of my
mouth, I was
      just
      >>>>>> going
      >>>>>>> to
      >>>>>>>>>>>>> write in details about this issue.
      >>>>>>>>>>>>>>> Consider the example from Bill's book, p.
101: say
      we need
      >>>>> to
      >>>>>>>>> identify
      >>>>>>>>>>>>> customers who have bought coffee and made a
purchase
      in the
      >>>>>>>>> electronics
      >>>>>>>>>>>>> store to give them coupons.
      >>>>>>>>>>>>>>> This is the code I usually write under these
      circumstances
      >>>>>> using
      >>>>>>>> my
      >>>>>>>>>>>>> 'brancher' class:
      >>>>>>>>>>>>>>> @Setter
      >>>>>>>>>>>>>>> class CouponIssuer{
      >>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
      >>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>   KStream<...> coupons(){
      >>>>>>>>>>>>>>>       return
      >>>>>>>>>
coffePurchases.join(electronicsPurchases...)...whatever
      >>>>>>>>>>>>>>>       /*In the real world the code here can be
      complex, so
      >>>>>>>>> creation of
      >>>>>>>>>>>>> a separate CouponIssuer class is fully
justified, in
      order to
      >>>>>>>> separate
      >>>>>>>>>>>>> classes' responsibilities.*/
      >>>>>>>>>>>>>>>  }
      >>>>>>>>>>>>>>> }
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new
CouponIssuer();
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
      >>>>>>>>>>>>>>>     .branch(predicate1,
couponIssuer::setCoffePurchases)
      >>>>>>>>>>>>>>>     .branch(predicate2,
      >>>>>> couponIssuer::setElectronicsPurchases)
      >>>>>>>>>>>>>>>     .onTopOf(transactionStream);
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to
wire up
      everything
      >>>>>>>> later,
      >>>>>>>>>>>>> without the terminal operation!!!*/
      >>>>>>>>>>>>>>> couponIssuer.coupons()...
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> Does this make sense?  In order to properly
      initialize the
      >>>>>>>>> CouponIssuer
      >>>>>>>>>>>>> we need the terminal operation to be called
before
      >>>>>>>>> streamsBuilder.build()
      >>>>>>>>>>>>> is called.
      >>>>>>>>>>>>>>> [BTW Paul, I just found out that your
KIP-401 is
      essentially
      >>>>>> the
      >>>>>>>>> next
      >>>>>>>>>>>>> KIP I was going to write here. I have some
thoughts
      based on
      >>>>> my
      >>>>>>>>> experience,
      >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
      >>>>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> Ivan
      >>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
      >>>>>>>>>>>>>>>> Ivan,
      >>>>>>>>>>>>>>>> I tried to make a very rough proof of
concept of a
      fluent
      >>>>> API
      >>>>>>>> based
      >>>>>>>>>>>>> off of
      >>>>>>>>>>>>>>>> KStream here
      (https://github.com/apache/kafka/pull/6512),
      >>>>>> and
      >>>>>>> I
      >>>>>>>>> think
      >>>>>>>>>>>>> I
      >>>>>>>>>>>>>>>> succeeded at removing both cons.
      >>>>>>>>>>>>>>>>    - Compatibility: I was incorrect
earlier about
      >>>>>>> compatibility
      >>>>>>>>>>>>> issues,
      >>>>>>>>>>>>>>>>    there aren't any direct ones.  I was
unaware
      that Java
      >>>>> is
      >>>>>>>> smart
      >>>>>>>>>>>>> enough to
      >>>>>>>>>>>>>>>>    distinguish between a branch(varargs...)
      returning one
      >>>>>>> thing
      >>>>>>>>> and
      >>>>>>>>>>>>> branch()
      >>>>>>>>>>>>>>>>    with no arguments returning another thing.
      >>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't
actually
      need
      >>>>> it.
      >>>>>>> We
      >>>>>>>>> can
      >>>>>>>>>>>>> just
      >>>>>>>>>>>>>>>>    build up the branches in the
KBranchedStream who
      shares
      >>>>>> its
      >>>>>>>>> state
      >>>>>>>>>>>>> with the
      >>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do
the
      branching.
      >>>>>>> It's
      >>>>>>>>> not
      >>>>>>>>>>>>> terribly
      >>>>>>>>>>>>>>>>    pretty in its current form, but I think it
      demonstrates
      >>>>>> its
      >>>>>>>>>>>>> feasibility.
      >>>>>>>>>>>>>>>> To be clear, I don't think that pull
request should
be
      >>>>> final
      >>>>>> or
      >>>>>>>>> even a
      >>>>>>>>>>>>>>>> starting point if we go in this direction,
I just
      wanted to
      >>>>>> see
      >>>>>>>> how
      >>>>>>>>>>>>>>>> challenging it would be to get the API
working.
      >>>>>>>>>>>>>>>> I will say though, that I'm not sure the
existing
      solution
      >>>>>>> could
      >>>>>>>> be
      >>>>>>>>>>>>>>>> deprecated in favor of this, which I had
originally
      >>>>> suggested
      >>>>>>>> was a
      >>>>>>>>>>>>>>>> possibility.  The reason is that the newly
branched
      streams
      >>>>>> are
      >>>>>>>> not
      >>>>>>>>>>>>>>>> available in the same scope as each
other.  That
      is, if we
      >>>>>>> wanted
      >>>>>>>>> to
      >>>>>>>>>>>>> merge
      >>>>>>>>>>>>>>>> them back together again I don't see a way
to do
      that.  The
      >>>>>> KIP
      >>>>>>>>>>>>> proposal
      >>>>>>>>>>>>>>>> has the same issue, though - all this
means is that
for
      >>>>>> either
      >>>>>>>>>>>>> solution,
      >>>>>>>>>>>>>>>> deprecating the existing branch(...) is
not on the
      table.
      >>>>>>>>>>>>>>>> Thanks,
      >>>>>>>>>>>>>>>> Paul
      >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan
Ponomarev <
      >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>>
      >>>>>>>>>>>>> wrote:
      >>>>>>>>>>>>>>>>> OK, let me summarize what we have
discussed up to
this
      >>>>>> point.
      >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed
that
      branch API
      >>>>>>> needs
      >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> There are two potential ways to do it:
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
      >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
      >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
      >>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
      >>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...)....
//onTopOf
      returns
      >>>>>> its
      >>>>>>>>> argument
      >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2)
The code
won't
      >>>>> make
      >>>>>>>> sense
      >>>>>>>>>>>>> until
      >>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> CONS: The need to create a
KafkaStreamsBrancher
      instance
      >>>>>>>>> contrasts the
      >>>>>>>>>>>>>>>>> fluency of other KStream methods.
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> 2. (as Paul proposes)
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> stream
      >>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
      >>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
      >>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or
noDefault(). Both
      >>>>>>>>> defaultBranch(..)
      >>>>>>>>>>>>> and
      >>>>>>>>>>>>>>>>> noDefault() return void
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams
interface
is
      >>>>>> defined.
      >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
      >>>>>>>> (defaultBranch(ks->)
      >>>>>>>>> and
      >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very
easy to
      miss the
      >>>>>> fact
      >>>>>>>>> that one
      >>>>>>>>>>>>>>>>> of the terminal methods should be called.
If these
      methods
      >>>>>> are
      >>>>>>>> not
      >>>>>>>>>>>>>>>>> called, we can throw an exception in
runtime.
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can
we do
better?
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> Ivan
      >>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
      >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
      >>>>>>>>>>>>>>>>>>> Paul,
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> I see your point when you are talking
about
      >>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be
      implemented the
      >>>>>>> easy
      >>>>>>>>> way.
      >>>>>>>>>>>>>>>>>>> Maybe we all should think further.
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that
assumes
      >>>>> nothing
      >>>>>>>> will
      >>>>>>>>>>>>> reach
      >>>>>>>>>>>>>>>>>>>> the default branch,
      >>>>>>>>>>>>>>>>>>> throwing an exception if such a case
occurs.
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be
the only
option
      >>>>>> besides
      >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios
when we
      want to
      >>>>>> just
      >>>>>>>>> silently
      >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any
      predicate. 2)
      >>>>>>> Throwing
      >>>>>>>>> an
      >>>>>>>>>>>>>>>>>>> exception in the middle of data flow
processing
      looks
      >>>>>> like a
      >>>>>>>> bad
      >>>>>>>>>>>>> idea.
      >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would
prefer to
      emit a
      >>>>>>>> special
      >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is
exactly
where
      >>>>>>> `default`
      >>>>>>>>> can
      >>>>>>>>>>>>> be
      >>>>>>>>>>>>>>>>>>> used.
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
      >>>>> InternalTopologyBuilder
      >>>>>>> to
      >>>>>>>>> track
      >>>>>>>>>>>>>>>>>>>> dangling
      >>>>>>>>>>>>>>>>>>> branches that haven't been terminated
and raise
      a clear
      >>>>>>> error
      >>>>>>>>>>>>> before it
      >>>>>>>>>>>>>>>>>>> becomes an issue.
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the
program is
      >>>>> compiled
      >>>>>>> and
      >>>>>>>>> run?
      >>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't
      compile if
      >>>>> used
      >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a
      method chain
      >>>>>>>> starting
      >>>>>>>>>>>>> from
      >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost
difference
      between
      >>>>>>>> runtime
      >>>>>>>>> and
      >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure
uncovers
      >>>>> instantly
      >>>>>> on
      >>>>>>>>> unit
      >>>>>>>>>>>>>>>>>>> tests, it costs more for the project
than a
      compilation
      >>>>>>>> failure.
      >>>>>>>>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> Ivan
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
      >>>>>>>>>>>>>>>>>>>> Ivan,
      >>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> Good point about the terminal
operation being
      required.
      >>>>>>> But
      >>>>>>>> is
      >>>>>>>>>>>>> that
      >>>>>>>>>>>>>>>>>>>> really
      >>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't
want a
      >>>>>> defaultBranch
      >>>>>>>>> they
      >>>>>>>>>>>>> can
      >>>>>>>>>>>>>>>>>>>> call
      >>>>>>>>>>>>>>>>>>>> some other terminal method
(noDefaultBranch()?)
      just as
      >>>>>>>>> easily.  In
      >>>>>>>>>>>>>>>>>>>> fact I
      >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a
nicer API
- a
      >>>>> user
      >>>>>>>> could
      >>>>>>>>>>>>> specify
      >>>>>>>>>>>>>>>>> a
      >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing
will reach
the
      >>>>>> default
      >>>>>>>>> branch,
      >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case
occurs.
That
      >>>>> seems
      >>>>>>> like
      >>>>>>>>> an
      >>>>>>>>>>>>>>>>>>>> improvement over the current branch()
API,
      which allows
      >>>>>> for
      >>>>>>>> the
      >>>>>>>>>>>>> more
      >>>>>>>>>>>>>>>>>>>> subtle
      >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting
dropped.
      >>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> The need for a terminal operation
certainly has
      to be
      >>>>>> well
      >>>>>>>>>>>>>>>>>>>> documented, but
      >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
      >>>>> InternalTopologyBuilder
      >>>>>>> to
      >>>>>>>>> track
      >>>>>>>>>>>>>>>>>>>> dangling
      >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated
and raise
      a clear
      >>>>>>> error
      >>>>>>>>>>>>> before it
      >>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that
there is
a
      >>>>> "build
      >>>>>>>> step"
      >>>>>>>>>>>>> where
      >>>>>>>>>>>>>>>>> the
      >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
      >>>>>> StreamsBuilder.build()
      >>>>>>> is
      >>>>>>>>>>>>> called.
      >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its
argument, I
agree
      >>>>> that
      >>>>>>> it's
      >>>>>>>>>>>>>>>>>>>> critical to
      >>>>>>>>>>>>>>>>>>>> allow users to do other operations on
the input
      stream.
      >>>>>>> With
      >>>>>>>>> the
      >>>>>>>>>>>>>>>>> fluent
      >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same
way all
other
      >>>>>>> operations
      >>>>>>>>> do -
      >>>>>>>>>>>>> if
      >>>>>>>>>>>>>>>>> you
      >>>>>>>>>>>>>>>>>>>> want to process off the original KStream
multiple
      >>>>> times,
      >>>>>>> you
      >>>>>>>>> just
      >>>>>>>>>>>>>>>>>>>> need the
      >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call
as many
      operations
      >>>>>> on
      >>>>>>> it
      >>>>>>>>> as
      >>>>>>>>>>>>> you
      >>>>>>>>>>>>>>>>>>>> desire.
      >>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> Thoughts?
      >>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> Best,
      >>>>>>>>>>>>>>>>>>>> Paul
      >>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan
Ponomarev <
      >>>>>>>>> iponoma...@mail.ru <mailto:iponoma...@mail.ru>
      >>>>>>>>>>>>>>>>>>>> wrote:
      >>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> Hello Paul,
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we
do not
      always need
      >>>>>> the
      >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal
operation we
      >>>>> don't
      >>>>>>>> know
      >>>>>>>>>>>>> when to
      >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its
argument,
      so we
      >>>>> can
      >>>>>> do
      >>>>>>>>>>>>> something
      >>>>>>>>>>>>>>>>>>>>> more with the original branch after
branching.
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of
special
      >>>>> object
      >>>>>>>>>>>>> construction
      >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream
methods.
But
      >>>>> here
      >>>>>> we
      >>>>>>>>> have a
      >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to
split the
      flow,
      >>>>> so
      >>>>>> I
      >>>>>>>>> think
      >>>>>>>>>>>>> this
      >>>>>>>>>>>>>>>>> is
      >>>>>>>>>>>>>>>>>>>>> still idiomatic.
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> Ivan
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
      >>>>>>>>>>>>>>>>>>>>>> Ivan,
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve
this
      API, but I
      >>>>>> find
      >>>>>>>> the
      >>>>>>>>>>>>>>>>>>>>>> onTopOff()
      >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it
      contrasts the
      >>>>>>> fluency
      >>>>>>>>> of
      >>>>>>>>>>>>> other
      >>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd
like to
      just call
      >>>>> a
      >>>>>>>>> method on
      >>>>>>>>>>>>> the
      >>>>>>>>>>>>>>>>>>>>> stream
      >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if
the branch
      cases
      >>>>> are
      >>>>>>>>> defined
      >>>>>>>>>>>>>>>>>>>>>> fluently.
      >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate,
handleCase)
      is very
      >>>>>> nice
      >>>>>>>>> and the
      >>>>>>>>>>>>>>>>>>>>>> right
      >>>>>>>>>>>>>>>>>>>>> way
      >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped
around
      how we
      >>>>>>> specify
      >>>>>>>>> the
      >>>>>>>>>>>>> source
      >>>>>>>>>>>>>>>>>>>>>> stream.
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>> Like:
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>> stream.branch()
      >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1,
this::handle1)
      >>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2,
this::handle2)
      >>>>>>>>>>>>>>>>>>>>>>
.defaultBranch(this::handleDefault);
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a
KBranchedStreams or
      >>>>>>>> KStreamBrancher
      >>>>>>>>> or
      >>>>>>>>>>>>>>>>>>>>> something,
      >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and
      terminated by
      >>>>>>>>>>>>> defaultBranch()
      >>>>>>>>>>>>>>>>>>>>>> (which
      >>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously
      incompatible with
      >>>>> the
      >>>>>>>>> current
      >>>>>>>>>>>>>>>>>>>>>> API, so
      >>>>>>>>>>>>>>>>>>>>> the
      >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to
have a
      different
      >>>>>> name,
      >>>>>>>> but
      >>>>>>>>> that
      >>>>>>>>>>>>>>>>>>>>>> seems
      >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we
could call it
      >>>>>> something
      >>>>>>>> like
      >>>>>>>>>>>>>>>>>>>>>> branched()
      >>>>>>>>>>>>>>>>>>>>> or
      >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the
old API.
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of
your
      KIP?  It
      >>>>>> seems
      >>>>>>>>> like it
      >>>>>>>>>>>>>>>>>>>>>> does to
      >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line
branching
      while also
      >>>>>>>> allowing
      >>>>>>>>> you
      >>>>>>>>>>>>> to
      >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of
      KBranchedStreams
      >>>>>> if
      >>>>>>>>> desired.
      >>>>>>>>>>>>>>>>>>>>>> Thanks,
      >>>>>>>>>>>>>>>>>>>>>> Paul
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan
Ponomarev
      >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
      >>>>>>>>>>>>>>>>>>>>>> wrote:
      >>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> void
handleFirstCase(KStream<String, String>
      ks){
      >>>>>>>>>>>>>>>>>>>>>>>
ks.filter(....).mapValues(...)
      >>>>>>>>>>>>>>>>>>>>>>> }
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String,
      String> ks){
      >>>>>>>>>>>>>>>>>>>>>>>
ks.selectKey(...).groupByKey()...
      >>>>>>>>>>>>>>>>>>>>>>> }
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> ......
      >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String,
String>()
      >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1,
      this::handleFirstCase)
      >>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2,
      this::handleSecondCase)
      >>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> Ivan
      >>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
      >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the
KafkaStreamsBrancher
      >>>>> takes a
      >>>>>>>>> Consumer
      >>>>>>>>>>>>> as a
      >>>>>>>>>>>>>>>>>>>>>>> second
      >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing,
and the
      example in
      >>>>>> the
      >>>>>>>> KIP
      >>>>>>>>>>>>> shows
      >>>>>>>>>>>>>>>>>>>>>>>> each
      >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a
terminal node
      >>>>>>>>> (KafkaStreams#to()
      >>>>>>>>>>>>>>>>>>>>>>>> in this
      >>>>>>>>>>>>>>>>>>>>>>>> case).
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but
how would
we
      >>>>> handle
      >>>>>>> the
      >>>>>>>>> case
      >>>>>>>>>>>>>>>>>>>>>>>> where the
      >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but
wants to
continue
      >>>>>>>> processing
      >>>>>>>>> and
      >>>>>>>>>>>>> not
      >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on
the
branched
      >>>>>> stream
      >>>>>>>>>>>>> immediately?
      >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic
as is if
      we had
      >>>>>>>> something
      >>>>>>>>> like
      >>>>>>>>>>>>>>>>>>>>>>>> this:
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
      >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
      >>>>>>>>>>>>>>>>>>>>>>>> predicate2);
      >>>>>>>>>>>>>>>>>>>>>>>>
branches[0].filter(....).mapValues(...)..
      >>>>>>>>>>>>>>>>>>>>>>>>
branches[1].selectKey(...).groupByKey().....
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>> Thanks!
      >>>>>>>>>>>>>>>>>>>>>>>> Bill
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM
Bill Bejeck
<
      >>>>>>>>> bbej...@gmail.com <mailto:bbej...@gmail.com>
      >>>>>>>>>>>>>>>>>>>>>>>> wrote:
      >>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>> All,
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the
discussion for
KIP-
      >>>>> 418.
      >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>> Hello,
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about
KIP-418.
      >>>>> Please
      >>>>>>>> take
      >>>>>>>>> a
      >>>>>>>>>>>>> look
      >>>>>>>>>>>>>>>>> at
      >>>>>>>>>>>>>>>>>>>>> the
      >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would
appreciate any
      feedback :)
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
      >>>>>

https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream

      >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
      >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
      >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
      >>>>> https://github.com/apache/kafka/pull/6164
      >>>>>>>>>>>>>>>>>>>>>>>>> Regards,
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>>>>>>>>>>>>>>>>>
      >>>>>>>>>
      >










Reply via email to