Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
o each > > >> `branch()` statement and return a `Map`. > > >> > > >> It makes the code easier to read, and also make the order of > > >> `Predicates` (that is essential) easier to grasp. > > >> > > >>>>>> Map> branches = stream.split() > > >>>>>>.branch("branchOne", Predicate) > > >>>>>>.branch( "branchTwo", Predicate) > > >>>>>>.defaultBranch("defaultBranch"); > > >> An open question is the case for which no defaultBranch() should > be > > >> specified. Atm, `split()` and `branch()` would return > > `BranchedKStream` > > >> and the call to `defaultBranch()` that returns the `Map` is > mandatory > > >> (what is not the case atm). Or is this actually not a real > problem, > > >> because users can just ignore the branch returned by > > `defaultBranch()` > > >> in the result `Map` ? > > >> > > >> > > >> About "inlining": So far, it seems to be a matter of personal > > >> preference. I can see arguments for both, but no "killer > > argument" yet > > >> that clearly make the case for one or the other. > > >> > > >> > > >> -Matthias > > >> > > >>> On 5/1/19 6:26 PM, Paul Whalen wrote: > > >>> Perhaps inlining is the wrong terminology. It doesn’t require > > that a lambda with the full downstream topology be defined inline - > > it can be a method reference as with Ivan’s original suggestion. > > The advantage of putting the predicate and its downstream logic > > (Consumer) together in branch() is that they are required to be near > > to each other. > > >>> > > >>> Ultimately the downstream code has to live somewhere, and deep > > branch trees will be hard to read regardless. > > >>> > > >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis > > > <mailto:michael.droga...@confluent.io>> wrote: > > >>>> > > >>>> I'm less enthusiastic about inlining the branch logic with its > > downstream > > >>>> functionality. Programs that have deep branch trees will > > quickly become > > >>>> harder to read as a single unit. > > >>>> > > >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen > > mailto:pgwha...@gmail.com>> wrote: > > >>>>> > > >>>>> Also +1 on the issues/goals as Michael outlined them, I think > > that sets a > > >>>>> great framework for the discussion. > > >>>>> > > >>>>> Regarding the SortedMap solution, my understanding is that the > > current > > >>>>> proposal in the KIP is what is in my PR which (pending naming > > decisions) is > > >>>>> roughly this: > > >>>>> > > >>>>> stream.split() > > >>>>>.branch(Predicate, Consumer>) > > >>>>>.branch(Predicate, Consumer>) > > >>>>>.defaultBranch(Consumer>); > > >>>>> > > >>>>> Obviously some ordering is necessary, since branching as a > > construct > > >>>>> doesn't work without it, but this solution seems like it > > provides as much > > >>>>> associativity as the SortedMap solution, because each branch() > > call > > >>>>> directly associates the "conditional" with the "code block." > > The value it > > >>>>> provides over the KIP solution is the accessing of streams in > > the same > > >>>>> scope. > > >>>>> > > >>>>> The KIP solution is less "dynamic" than the SortedMap solution > > in the sense > > >>>>> that it is slightly clumsier to add a dynamic number of > > branches, but it is > > >>>>> certainly possible. It seems to me like the API should favor > > the "static" > > >>>>> case anyway, and s
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
I'm less enthusiastic about inlining the branch logic with its downstream functionality. Programs that have deep branch trees will quickly become harder to read as a single unit. On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen wrote: > Also +1 on the issues/goals as Michael outlined them, I think that sets a > great framework for the discussion. > > Regarding the SortedMap solution, my understanding is that the current > proposal in the KIP is what is in my PR which (pending naming decisions) is > roughly this: > > stream.split() > .branch(Predicate, Consumer>) > .branch(Predicate, Consumer>) > .defaultBranch(Consumer>); > > Obviously some ordering is necessary, since branching as a construct > doesn't work without it, but this solution seems like it provides as much > associativity as the SortedMap solution, because each branch() call > directly associates the "conditional" with the "code block." The value it > provides over the KIP solution is the accessing of streams in the same > scope. > > The KIP solution is less "dynamic" than the SortedMap solution in the sense > that it is slightly clumsier to add a dynamic number of branches, but it is > certainly possible. It seems to me like the API should favor the "static" > case anyway, and should make it simple and readable to fluently declare and > access your branches in-line. It also makes it impossible to ignore a > branch, and it is possible to build an (almost) identical SortedMap > solution on top of it. > > I could also see a middle ground where instead of a raw SortedMap being > taken in, branch() takes a name and not a Consumer. Something like this: > > Map> branches = stream.split() > .branch("branchOne", Predicate) > .branch( "branchTwo", Predicate) > .defaultBranch("defaultBranch", Consumer>); > > Pros for that solution: > - accessing branched KStreams in same scope > - no double brace initialization, hopefully slightly more readable than > SortedMap > > Cons > - downstream branch logic cannot be specified inline which makes it harder > to read top to bottom (like existing API and SortedMap, but unlike the KIP) > - you can forget to "handle" one of the branched streams (like existing > API and SortedMap, but unlike the KIP) > > (KBranchedStreams could even work *both* ways but perhaps that's overdoing > it). > > Overall I'm curious how important it is to be able to easily access the > branched KStream in the same scope as the original. It's possible that it > doesn't need to be handled directly by the API, but instead left up to the > user. I'm sort of in the middle on it. > > Paul > > > > On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman > wrote: > > > I'd like to +1 what Michael said about the issues with the existing > branch > > method, I agree with what he's outlined and I think we should proceed by > > trying to alleviate these problems. Specifically it seems important to be > > able to cleanly access the individual branches (eg by mapping > > name->stream), which I thought was the original intention of this KIP. > > > > That said, I don't think we should so easily give in to the double brace > > anti-pattern or force ours users into it if at all possible to > avoid...just > > my two cents. > > > > Cheers, > > Sophie > > > > On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < > > michael.droga...@confluent.io> wrote: > > > > > I’d like to propose a different way of thinking about this. To me, > there > > > are three problems with the existing branch signature: > > > > > > 1. If you use it the way most people do, Java raises unsafe type > > warnings. > > > 2. The way in which you use the stream branches is positionally coupled > > to > > > the ordering of the conditionals. > > > 3. It is brittle to extend existing branch calls with additional code > > > paths. > > > > > > Using associative constructs instead of relying on ordered constructs > > would > > > be a stronger approach. Consider a signature that instead looks like > > this: > > > > > > Map> KStream#branch(SortedMap > > super K,? super V>>); > > > > > > Branches are given names in a map, and as a result, the API returns a > > > mapping of names to streams. The ordering of the conditionals is > > maintained > > > because it’s a sorted map. Insert order determines the order of > > evaluation. > >
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
I’d like to propose a different way of thinking about this. To me, there are three problems with the existing branch signature: 1. If you use it the way most people do, Java raises unsafe type warnings. 2. The way in which you use the stream branches is positionally coupled to the ordering of the conditionals. 3. It is brittle to extend existing branch calls with additional code paths. Using associative constructs instead of relying on ordered constructs would be a stronger approach. Consider a signature that instead looks like this: Map> KStream#branch(SortedMap>); Branches are given names in a map, and as a result, the API returns a mapping of names to streams. The ordering of the conditionals is maintained because it’s a sorted map. Insert order determines the order of evaluation. This solves problem 1 because there are no more varargs. It solves problem 2 because you no longer lean on ordering to access the branch you’re interested in. It solves problem 3 because you can introduce another conditional by simply attaching another name to the structure, rather than messing with the existing indices. One of the drawbacks is that creating the map inline is historically awkward in Java. I know it’s an anti-pattern to use voluminously, but double brace initialization would clean up the aesthetics. On Tue, Apr 30, 2019 at 9:10 AM John Roesler wrote: > Hi Ivan, > > Thanks for the update. > > FWIW, I agree with Matthias that the current "start branching" operator is > confusing when named the same way as the actual branches. "Split" seems > like a good name. Alternatively, we can do without a "start branching" > operator at all, and just do: > > stream > .branch(Predicate) > .branch(Predicate) > .defaultBranch(); > > Tentatively, I think that this branching operation should be terminal. That > way, we don't create ambiguity about how to use it. That is, `branch` > should return `KBranchedStream`, while `defaultBranch` is `void`, to > enforce that it comes last, and that there is only one definition of the > default branch. Potentially, we should log a warning if there's no default, > and additionally log a warning (or throw an exception) if a record falls > though with no default. > > Thoughts? > > Thanks, > -John > > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax > wrote: > > > Thanks for updating the KIP and your answers. > > > > > > > this is to make the name similar to String#split > > >> that also returns an array, right? > > > > The intend was to avoid name duplication. The return type should _not_ > > be an array. > > > > The current proposal is > > > > stream.branch() > > .branch(Predicate) > > .branch(Predicate) > > .defaultBranch(); > > > > IMHO, this reads a little odd, because the first `branch()` does not > > take any parameters and has different semantics than the later > > `branch()` calls. Note, that from the code snippet above, it's hidden > > that the first call is `KStream#branch()` while the others are > > `KBranchedStream#branch()` what makes reading the code harder. > > > > Because I suggested to rename `addBranch()` -> `branch()`, I though it > > might be better to also rename `KStream#branch()` to avoid the naming > > overlap that seems to be confusing. The following reads much cleaner to > me: > > > > stream.split() > > .branch(Predicate) > > .branch(Predicate) > > .defaultBranch(); > > > > Maybe there is a better alternative to `split()` though to avoid the > > naming overlap. > > > > > > > 'default' is, however, a reserved word, so unfortunately we cannot have > > a method with such name :-) > > > > Bummer. Didn't consider this. Maybe we can still come up with a short > name? > > > > > > Can you add the interface `KBranchedStream` to the KIP with all it's > > methods? It will be part of public API and should be contained in the > > KIP. For example, it's unclear atm, what the return type of > > `defaultBranch()` is. > > > > > > You did not comment on the idea to add a `KBranchedStream#get(int index) > > -> KStream` method to get the individually branched-KStreams. Would be > > nice to get your feedback about it. It seems you suggest that users > > would need to write custom utility code otherwise, to access them. We > > should discuss the pros and cons of both approaches. It feels > > "incomplete" to me atm, if the API has no built-in support to get the > > branched-KStreams directly. > > > > > > > > -Matthias > > > > > > On 4/13/19 2:13 AM, Ivan Ponomarev wrote: > > > Hi all! > > > > > > I have updated the KIP-418 according to the new vision. > > > > > > Matthias, thanks for your comment! > > > > > >> Renaming KStream#branch() -> #split() > > > > > > I can see your point: this is to make the name similar to String#split > > > that also returns an array, right? But is it worth the loss of > backwards > > > compatibility? We can have overloaded branch() as well without > affecting > > > the existing code. Maybe the old ar
[jira] [Created] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics
Michael Drogalis created KAFKA-8297: --- Summary: Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics Key: KAFKA-8297 URL: https://issues.apache.org/jira/browse/KAFKA-8297 Project: Kafka Issue Type: Bug Components: streams Reporter: Michael Drogalis When using the ConsumerRecordFactory, it's convenient to specify a default topic to create records with: {code:java} ConsumerRecordFactory inputFactory = new ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer); {code} However, when the factory is used to create a record with a String key: {code:java} inputFactory.create("any string", user) {code} Compilation fails with the following warning: {code:java} Ambiguous method call. Both: create(String, User) in ConsumerRecordFactory and create(String, User) in ConsumerRecordFactory match {code} At first glance, this is a really confusing error to see during compilation. What's happening is that there are two clashing signatures for `create`: create(K, V) and create(String, V). The latter signature represents a topic name. It seems like fixing this would require breaking the existing interface. This is a really opaque problem to hit though, and it would be great if we could avoid having users encounter this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8296) Kafka Streams branch method raises type warnings
Michael Drogalis created KAFKA-8296: --- Summary: Kafka Streams branch method raises type warnings Key: KAFKA-8296 URL: https://issues.apache.org/jira/browse/KAFKA-8296 Project: Kafka Issue Type: Bug Components: streams Reporter: Michael Drogalis Because the branch method in the DSL takes vargargs, using it as follows raises an unchecked type warning: {code:java} KStream[] branches = builder.stream(inputTopic) .branch((key, user) -> "united states".equals(user.getCountry()), (key, user) -> "germany".equals(user.getCountry()), (key, user) -> "mexico".equals(user.getCountry()), (key, user) -> true); {code} The compiler warns with: {code:java} Warning:(39, 24) java: unchecked generic array creation for varargs parameter of type org.apache.kafka.streams.kstream.Predicate[] {code} This is unfortunate because of the way Java's type system + generics work. We could possibly fix this by adding the @SafeVarargs annotation to the branch method signatures. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8213) KStreams interactive query documentation typo
Michael Drogalis created KAFKA-8213: --- Summary: KStreams interactive query documentation typo Key: KAFKA-8213 URL: https://issues.apache.org/jira/browse/KAFKA-8213 Project: Kafka Issue Type: Bug Components: documentation Reporter: Michael Drogalis In [the Interactive Queries docs|https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app], we have a minor typo: Actual: You can use the corresponding local data in other parts of your application code, as long as it doesn’t required calling the Kafka Streams API. Expected: You can use the corresponding local data in other parts of your application code, as long as it doesn’t require calling the Kafka Streams API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8212) KStreams documentation Maven artifact table is cut off
Michael Drogalis created KAFKA-8212: --- Summary: KStreams documentation Maven artifact table is cut off Key: KAFKA-8212 URL: https://issues.apache.org/jira/browse/KAFKA-8212 Project: Kafka Issue Type: Bug Components: documentation Reporter: Michael Drogalis Attachments: Screen Shot 2019-04-10 at 2.04.09 PM.png In the [Writing a Streams Application doc|https://kafka.apache.org/21/documentation/streams/developer-guide/write-streams.html], the section "LIBRARIES AND MAVEN ARTIFACTS" has a table that lists out the Maven artifacts. The items in the group ID overflow and are cut off by the table column, even on a very large monitor. Note that "artifact ID" seems to have its word break property set correctly. See the attached image. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8210) Missing link for KStreams in Streams DSL docs
Michael Drogalis created KAFKA-8210: --- Summary: Missing link for KStreams in Streams DSL docs Key: KAFKA-8210 URL: https://issues.apache.org/jira/browse/KAFKA-8210 Project: Kafka Issue Type: Bug Components: documentation Reporter: Michael Drogalis In [the Streams DSL docs|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html], there is some text under the KTable section that reads: "We have already seen an example of a changelog stream in the section streams_concepts_duality." "streams_concepts_duality" seems to indicate that it should be a link, but it is not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8209) Wrong link for KStreams DSL in Core Concepts doc
Michael Drogalis created KAFKA-8209: --- Summary: Wrong link for KStreams DSL in Core Concepts doc Key: KAFKA-8209 URL: https://issues.apache.org/jira/browse/KAFKA-8209 Project: Kafka Issue Type: Bug Components: documentation Reporter: Michael Drogalis In the [core concepts doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there is a link in the "States" section for "Kafka Streams DSL". It points to the wrong link. Actual: https://kafka.apache.org/21/documentation/streams/developer-guide/#streams_dsl Expected: https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8208) Broken link for out-of-order data in KStreams Core Concepts doc
Michael Drogalis created KAFKA-8208: --- Summary: Broken link for out-of-order data in KStreams Core Concepts doc Key: KAFKA-8208 URL: https://issues.apache.org/jira/browse/KAFKA-8208 Project: Kafka Issue Type: Improvement Components: documentation Reporter: Michael Drogalis In the [core concepts doc|https://kafka.apache.org/21/documentation/streams/core-concepts], there is a link in the "Out-of-Order Handling" section for "out-of-order data". It 404's to https://kafka.apache.org/21/documentation/streams/tbd. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8200) TopologyTestDriver should offer an iterable signature of readOutput
Michael Drogalis created KAFKA-8200: --- Summary: TopologyTestDriver should offer an iterable signature of readOutput Key: KAFKA-8200 URL: https://issues.apache.org/jira/browse/KAFKA-8200 Project: Kafka Issue Type: Improvement Reporter: Michael Drogalis When using the TopologyTestDriver, one examines the output on a topic with the readOutput method. This method returns one record at a time, until no more records can be found, at which point in returns null. Many times, the usage pattern around readOutput will involve writing a loop to extract a number of records from the topic, building up a list of records, until it returns null. It would be helpful to offer an iterable signature of readOutput, which returns either an iterator or list over the records that are currently available in the topic. This would effectively remove the loop that a user needs to write for him/herself each time. Such a signature might look like: {code:java} public Iterable> readOutput(java.lang.String topic); {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8198) KStreams testing docs use non-existent method "pipe"
Michael Drogalis created KAFKA-8198: --- Summary: KStreams testing docs use non-existent method "pipe" Key: KAFKA-8198 URL: https://issues.apache.org/jira/browse/KAFKA-8198 Project: Kafka Issue Type: Bug Reporter: Michael Drogalis In [the testing docs for KStreams|https://kafka.apache.org/20/documentation/streams/developer-guide/testing.html], we use the following code snippet: {code:java} ConsumerRecordFactory factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer()); testDriver.pipe(factory.create("key", 42L)); {code} As of Apache Kafka 2.2.0, this method no longer exists. We should correct the docs to use the pipeInput method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8181) Streams docs on serialization include Avro header, but no content
Michael Drogalis created KAFKA-8181: --- Summary: Streams docs on serialization include Avro header, but no content Key: KAFKA-8181 URL: https://issues.apache.org/jira/browse/KAFKA-8181 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.2.0 Reporter: Michael Drogalis On [the documentation for data types and serialization|https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes.html], Avro is listed in the table of contents as something supported out of the box. The link is dead, though, because there is no content. We should either remove the header or supply the content. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7855) Kafka Streams Maven Archetype quickstart fails to compile out of the box
Michael Drogalis created KAFKA-7855: --- Summary: Kafka Streams Maven Archetype quickstart fails to compile out of the box Key: KAFKA-7855 URL: https://issues.apache.org/jira/browse/KAFKA-7855 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.0 Environment: Java 8, OS X 10.13.6 Reporter: Michael Drogalis Attachments: output.log When I follow the [quickstart tutorial|https://kafka.apache.org/21/documentation/streams/tutorial] and issue the command to set up a new Maven project, the generated example fails to compile. Adding a Produced.with() on the source seems to fix this. I've attached the compiler output. -- This message was sent by Atlassian JIRA (v7.6.3#76005)