Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
> To recap: Could it be that the idea to apply a DISTINCT-aggregation is > for a different use-case than to remove duplicate messages from a KStream? OK, imagine the following: We have 10 thermometers. Each second they transmit the current measured temperature. The id of the thermometer is the key, the temperature is the value. The temperature measured by a single thermometer changes slowly: on average, say once per 30 seconds, so most of the messages from a given thermometer are duplicates. But we need to react to the change ASAP. And we need a materialized view: a relational database table with the columns 'thermometerID' and 'temperature'. 1) We don't want to send 100K updates per second to the database. 2) But it's still ok if some updates will be duplicates -- the updates are idempotent. 3) We even can afford to loose a fraction of data -- the data from each thermometer will be eventually 'fixed' by its fresh readings. This is my use case (actually these are not thermometers and temperature, but it doesn't matter :-) Is it conceptually different from what you were thinking about? Regards, Ivan 09.08.2021 3:05, Matthias J. Sax пишет: Thanks for sharing your thoughts. I guess my first question about why using the key boils down to the use case, and maybe you have something else in mind than I do. I see it this way: we define 'distinct' operation as returning a single record per time window per selected key, I believe this sentence explains your way of thinking about it. My way of thinking about it is different though: KStream de-duplication means to "filter/drop" duplicate records, and a record is by definition a duplicate if key AND value are the same. --- Or are the use case, for which there might be an "message ID" and even if the "message ID" is the same, the content of the message might be different? If this holds, do we really de-duplicate records (sounds more like, pick a random one)? Just re-read some of my older replies, and I guess, back than I did just comment about your KeyExtractor idea, without considering the end-the-end picture. Thus, my reply below goes into a different direction now: We only need to apply a window because we need to purge state eventually. To this end, I actually believe that applying a "sliding window" is the best way to go: for each message we encounter, we start the de-duplication window when the message arrives, don't emit any duplicates as long as the window is open, and purge the state afterwards. Of course, internally, the implementation must be quite different compared to a regular aggregation: we need to pull the value into the key, to create a unique window for each message, but this seems to be an implementation detail. Thus, I am wondering if we should not try to put a square pig into a round whole: the current windowing/aggregation API is not designed for a KStream de-duplication use-case, because a de-duplication is no aggregation to begin with. Why not use a different API: KStream KStream#deduplicate(final Duration windowSize); Including some more overloads to allow configuring the internal state store (this state store should not be queryable similar to KStream-KStream state stores...). To recap: Could it be that the idea to apply a DISTINCT-aggregation is for a different use-case than to remove duplicate messages from a KStream? -Matthias On 8/6/21 4:12 AM, Ivan Ponomarev wrote: - Why restrict de-duplication for the key only? Should we not also consider the value (or make it somehow flexible and let the user choose?) Wasn't it the first idea that we abandoned (I mean, to provide 'KeyExtractor' and so on)? In order to keep things simple we decided to make .selectKey(...) //here we select anything we need //add markAsPartitioned from KIP-759 to taste .groupByKey() .windowed(...) .distinct() //the only new operation that we add to the API, reusing //all the windowed aggregations' infrastructure - I am wondering if the return type should be `KStream` instead of a `KTable`? If I deduplicate a stream, I might expect a stream back? I don't really consider a stream de-duplication an aggregation with "mutable state"... First, because it's going to be an operation on a Time/SessionWindowedKStream, and these operations usually return KTable, ...>. Then, it might be useful to know to which time window a deduplicated record actually belongs. And it is trivial task to turn this table back to a stream. IMHO, an unordered stream and it's ordered "cousin" should yield the same result? -- Given your example it seems you want to keep the first record base on offset order. Wondering why? I see it this way: we define 'distinct' operation as returning a single record per time window per selected key, no matter what record. So it's ok if it yields different results for different orderings if its main property h
Re: [DISCUSS] KIP-759: Unneeded repartition canceling
wed-aggregations: The result of a window aggregation is partitioned by the "plain key": if we write the result into a topic using the same partitioning function, we would write to different partitions... (I guess it was never an issue so far, as we don't have KIP-300 in place yet...) It's also not an issue for IQ, because we know the plain key, and thus can route to the right task. About a solution: I think it might be ok to say we don't need to solve this problem, but it's the users responsibility to take IQ into account. Ie, if they want to use IQ downstream, the need to repartition: for this case, repartitioning is _NOT_ unnecessary... The same argument seems to apply for the join case I mentioned above. -- Given that `markAsPartitioned()` is an advanced feature, it seems ok to leave it to the user to use correctly (we should of course call it out in the docs!). -Matthias On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote: Before I dive in to the question of IQ and the approaches you proposed, can you just elaborate on the problem itself? By definition, the `markAsPartitioned` flag means that a repartition would be a no-op, ie that the stream (and its partitioning) would be the same whether or not a repartition is inserted. For this to be true, it then has to be the case that Partitioner.partition(key) == Partitioner.partition(map(key)) The left-hand side of the above is precisely how we determine the partition number that a key belongs to when using IQ. It shouldn't matter whether the user is querying a key in a store upstream of the key-changing operation or a mapped key downstream of it -- either way we just apply the given Partitioner. See StreamsMetadataState#getKeyQueryMetadataForKey <https://github.com/apache/kafka/blob/6854eb8332d6ef1f1c6216d2f67d6e146b1ef60f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L283> for where this happens If we're concerned that users might try to abuse the new `markAsPartitioned` feature, or accidentally misuse it, then we could add a runtime check that applies the Partitioner associated with that subtopology to the key being processed and the mapped key result to assert that they do indeed match. Imo this is probably overkill, just putting it out there. On Sat, Aug 7, 2021 at 1:42 PM Ivan Ponomarev wrote: Hi Sophie, thanks for your reply! So your proposal is: 1). For each key-changing operation, deprecate the existing overloads that accept a Named, and replace them with overloads that take an operator-specific config object. 2). Add `markAsPartitioned` flag to these configs. IMO, this looks much better than the original proposal, I like it very much and I think I will rewrite the KIP soon. I absolutely agree with your points. Repartition logic is not a part of the public contract, and it's much better to give it correct hints instead of telling explicitly what it should do. ... Since we're generating such bright ideas, maybe we should also brainstorm the interactive query problem? The problem is that interactive queries will not work properly when `markAsPartitioned` is used. Although original key and mapped key will be in the same partition, we will no longer be able to guess this partition given the mapped key only. The possible approaches are: 1) Give up and don't use interactive queries together with `markAsPartitioned`. This is what I suppose now. But can we do better? 2) Maybe we should ask the user to provide 'reverse mapping' that will allow IQ to restore the original key in order to choose the correct partition. We can place this mapping in our new configuration object. Of course, there is no way for KStreams to verify in compile time/startup time that the this function is actually the reverse mapping that extract the old key from the new one. Users will forget to provide this function. Users will provide wrong functions. This all looks too fragile. 3) Maybe there can be a completely different approach. Let's introduce a new entity -- composite keys, consisting of "head" and "tail". The partition for the composite key is calculated based on its 'head' value only. If we provide a key mapping in form key -> CompositeKey(key, tail), then it's obvious that we do not need a repartition. When an interactive query needs to guess the partition for CompositeKey, it just extracts its head and calculates the correct partition. We can select CompositeKey before groupByKey() and aggregation operations, and this will not involve repartition. And IQ will work. Is it too daring idea, WDYT? My concern: will it cover all the cases when we want to choose a different key, but also avoid repartition? Regards, Ivan 06.08.2021 23:19, Sophie Blee-Goldman пишет: Hey Ivan I completely agree that adding it as a config to Grouped/Joined/etc isn't much better, I was just listing it for completeness, and that I would prefer to make it a configuration of the key-chang
Re: [DISCUSS] KIP-759: Unneeded repartition canceling
s same conversation over and over, because we can now stick any additional operator properties into that same config object. WDYT? By the way, the above idea (introducing a single config object to wrap all operator properties) was also raised by John Roesler a while back. Let's hope he hasn't changed his mind since then :) On Fri, Aug 6, 2021 at 3:01 AM Ivan Ponomarev wrote: Hi Matthias, Concerning the naming: I like `markAsPartitioned`, because it describes what this operation is actually doing! Hi Sophie, I see the concern about poor code cohesion. We declare key mapping in one place of code, then later in another place we say "markAsPartitioned()". When we change the code six months later, we might forget to remove markAsPartitioned(), especially if it's placed in another method or class. But I don't understand why do you propose to include this config into Grouped/Joined/StreamJoined, because from this point of view it's not a better solution? The best approach regarding the cohesion might be to to add an extra 'preservePartition' flag to every key-changing operation, that is 1) selectKey 2) map 3) flatMap 4) transform 5) flatTransform in order to tell if the provided mapping require repartition or not. Indeed, this is a mapping operation property, not grouping one! BTW: the idea of adding extra parameter to `selectKey` was once coined by John Roesler. Arguments in favour for this approach: 1) better code cohesion from the point of view of the user, 2) 'smarter' code (the decision is taken depending on metadata provided for all the upstream mappings), 3) overall safer for the user. Arguments against: invasive KStreams API change, 5 more method overloads. Further on, when we add a new key-changing operation to KStream, we must add an overloaded version with 'preservePartition'. When we add a new overloaded version for existing operation, we actually might need to add two or more overloaded versions. This will soon become a mess. I thought that since `markAsPartitioned` is intended for advanced users, they will use it with care. When you're in a position where every serialization/deserialization round matters for the latency, you're extremely careful with the topology and you will not thoughtlessly add new key-changing operations without controlling how it's going to change the overall topology. By the way, if we later find a better solution, it's way more easy to deprecate a single `markAsPartitioned` operation than 5 method overloads. What do you think? 04.08.2021 4:23, Sophie Blee-Goldman пишет: Do we really need a whole DSL operator for this? I think the original name for this operator -- `cancelRepartition()` -- is itself a sign that this is not an operation on the stream itself but rather a command/request to whichever operator would have otherwise triggered this repartition. What about instead adding a new field to the Grouped/Joined/StreamJoined config objects that signals them to skip the repartitioning? The one downside to this specific proposal is that you would then need to specify this for every stateful operation downstream of the key-changing operation. So a better alternative might be to introduce this `skipRepartition` field, or whatever we want to call it, to the config object of the operator that's actually doing the key changing operation which is apparently preserving the partitioning. Imo this would be more "safe" relative to the current proposal, as the user has to explicitly consider whether every key changing operation is indeed preserving the partitioning. Otherwise you could code up a topology with several key changing operations at the beginning which do require repartitioning. Then you get to the end of the topology and insert one final key changing operation that doesn't, assume you can just cancel the repartition, and suddenly you're wondering why your results are all screwed up On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax wrote: Thanks for the KIP Ivan! I think it's a good feature to give advanced users more control, and allow them to build more efficient application. Not sure if I like the proposed named though (the good old "naming things" discussion :)) Did you consider alternatives? What about - markAsPartitioned() - markAsKeyed() - skipRepartition() Not sure if there are other idea on a good name? -Matthias On 6/24/21 7:45 AM, Ivan Ponomarev wrote: Hello, I'd like to start a discussion for KIP-759: https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling This is an offshoot of the discussion of KIP-655 for a `distinct` operator, which turned out to be a separate proposal. The proposal is quite trivial, however, we still might consider alternatives (see 'Possible Alternatives' section). Regards, Ivan
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
> - Why restrict de-duplication for the key only? Should we not also > consider the value (or make it somehow flexible and let the user choose?) Wasn't it the first idea that we abandoned (I mean, to provide 'KeyExtractor' and so on)? In order to keep things simple we decided to make .selectKey(...) //here we select anything we need //add markAsPartitioned from KIP-759 to taste .groupByKey() .windowed(...) .distinct() //the only new operation that we add to the API, reusing //all the windowed aggregations' infrastructure > - I am wondering if the return type should be `KStream` instead of a > `KTable`? If I deduplicate a stream, I might expect a stream back? I > don't really consider a stream de-duplication an aggregation with > "mutable state"... First, because it's going to be an operation on a Time/SessionWindowedKStream, and these operations usually return KTable, ...>. Then, it might be useful to know to which time window a deduplicated record actually belongs. And it is trivial task to turn this table back to a stream. > IMHO, an unordered stream and it's ordered "cousin" should > yield the same result? -- Given your example it seems you want to keep > the first record base on offset order. Wondering why? I see it this way: we define 'distinct' operation as returning a single record per time window per selected key, no matter what record. So it's ok if it yields different results for different orderings if its main property holds! And since we can select any key we like, we can get any degree of 'deduplication granularity' and 'determinism'. > While I agree that deduplication for overlapping window is questionable, > I am still wondering if you plan to disallow it (by adding a runtime > check and throwing an exception), or not? Thanks for this point! I think that 'fail-fast' approach is good. We might need to throw an exception, I will add this into the KIP: - SessionWindows -- OK - SlidingWindows -- Exception - TimeWindows -- tumbling -- OK hopping -- Exception Regards, Ivan 04.08.2021 4:22, Matthias J. Sax пишет: Couple of questions: - Why restrict de-duplication for the key only? Should we not also consider the value (or make it somehow flexible and let the user choose?) - I am wondering if the return type should be `KStream` instead of a `KTable`? If I deduplicate a stream, I might expect a stream back? I don't really consider a stream de-duplication an aggregation with "mutable state"... Also, why would the result key need to be windowed? Btw: How should out-of-order data be handled? Given that you only want to consider the key, the value could be different, and thus, if there is out-of-order data, keeping the one or other value could make a difference? IMHO, an unordered stream and it's ordered "cousin" should yield the same result? -- Given your example it seems you want to keep the first record base on offset order. Wondering why? While I agree that deduplication for overlapping window is questionable, I am still wondering if you plan to disallow it (by adding a runtime check and throwing an exception), or not? On 8/1/21 6:42 AM, Ivan Ponomarev wrote: Hi Bruno, I'm sorry for the delay with the answer. Unfortunately your messages were put to spam folder, that's why I didn't answer them right away. Concerning your question about comparing serialized values vs. using equals: I think it must be clear now due to John's explanations. Distinct is a stateful operation, thus we will need to use serialization. (Although AFAICS the in-memory storage might be a good practical solution in many cases). I do currently not see why it should not make sense in hopping windows... I do not understand the following sentence: "...one record can be multiplied instead of deduplication." Ok, let me explain. As it's written in the KIP, "The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window." Also it's worth to remember that the result of `distinct` is KTable, V>, not Stream. If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a record (key, val) with timestamp 25 arrives, it will be forwarded three times ('multiplied') since is falls into the intersection of all three windows. The output will be (key@[0/40], val) (key@[10/50], val) (key@[20/60], val) You can reason about `distinct` operation just like you reason about `sum` or `count`. When a record arrives that falls into a window, we update the aggregation on this window. For `distinct`, when extra records arrive into the same window, we also perform some sort of aggregation (we may even count them internally!), but, unlike sum or count, we will not forward anything since counter is strictly greater than zero. You may refer to 'usage exampl
Re: [DISCUSS] KIP-759: Unneeded repartition canceling
Hi Matthias, Concerning the naming: I like `markAsPartitioned`, because it describes what this operation is actually doing! Hi Sophie, I see the concern about poor code cohesion. We declare key mapping in one place of code, then later in another place we say "markAsPartitioned()". When we change the code six months later, we might forget to remove markAsPartitioned(), especially if it's placed in another method or class. But I don't understand why do you propose to include this config into Grouped/Joined/StreamJoined, because from this point of view it's not a better solution? The best approach regarding the cohesion might be to to add an extra 'preservePartition' flag to every key-changing operation, that is 1) selectKey 2) map 3) flatMap 4) transform 5) flatTransform in order to tell if the provided mapping require repartition or not. Indeed, this is a mapping operation property, not grouping one! BTW: the idea of adding extra parameter to `selectKey` was once coined by John Roesler. Arguments in favour for this approach: 1) better code cohesion from the point of view of the user, 2) 'smarter' code (the decision is taken depending on metadata provided for all the upstream mappings), 3) overall safer for the user. Arguments against: invasive KStreams API change, 5 more method overloads. Further on, when we add a new key-changing operation to KStream, we must add an overloaded version with 'preservePartition'. When we add a new overloaded version for existing operation, we actually might need to add two or more overloaded versions. This will soon become a mess. I thought that since `markAsPartitioned` is intended for advanced users, they will use it with care. When you're in a position where every serialization/deserialization round matters for the latency, you're extremely careful with the topology and you will not thoughtlessly add new key-changing operations without controlling how it's going to change the overall topology. By the way, if we later find a better solution, it's way more easy to deprecate a single `markAsPartitioned` operation than 5 method overloads. What do you think? 04.08.2021 4:23, Sophie Blee-Goldman пишет: Do we really need a whole DSL operator for this? I think the original name for this operator -- `cancelRepartition()` -- is itself a sign that this is not an operation on the stream itself but rather a command/request to whichever operator would have otherwise triggered this repartition. What about instead adding a new field to the Grouped/Joined/StreamJoined config objects that signals them to skip the repartitioning? The one downside to this specific proposal is that you would then need to specify this for every stateful operation downstream of the key-changing operation. So a better alternative might be to introduce this `skipRepartition` field, or whatever we want to call it, to the config object of the operator that's actually doing the key changing operation which is apparently preserving the partitioning. Imo this would be more "safe" relative to the current proposal, as the user has to explicitly consider whether every key changing operation is indeed preserving the partitioning. Otherwise you could code up a topology with several key changing operations at the beginning which do require repartitioning. Then you get to the end of the topology and insert one final key changing operation that doesn't, assume you can just cancel the repartition, and suddenly you're wondering why your results are all screwed up On Tue, Aug 3, 2021 at 6:02 PM Matthias J. Sax wrote: Thanks for the KIP Ivan! I think it's a good feature to give advanced users more control, and allow them to build more efficient application. Not sure if I like the proposed named though (the good old "naming things" discussion :)) Did you consider alternatives? What about - markAsPartitioned() - markAsKeyed() - skipRepartition() Not sure if there are other idea on a good name? -Matthias On 6/24/21 7:45 AM, Ivan Ponomarev wrote: Hello, I'd like to start a discussion for KIP-759: https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling This is an offshoot of the discussion of KIP-655 for a `distinct` operator, which turned out to be a separate proposal. The proposal is quite trivial, however, we still might consider alternatives (see 'Possible Alternatives' section). Regards, Ivan
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hi Bruno, I'm sorry for the delay with the answer. Unfortunately your messages were put to spam folder, that's why I didn't answer them right away. Concerning your question about comparing serialized values vs. using equals: I think it must be clear now due to John's explanations. Distinct is a stateful operation, thus we will need to use serialization. (Although AFAICS the in-memory storage might be a good practical solution in many cases). > I do currently not see why it should not make sense in hopping windows... I do not understand the following sentence: "...one record can be multiplied instead of deduplication." Ok, let me explain. As it's written in the KIP, "The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window." Also it's worth to remember that the result of `distinct` is KTable, V>, not Stream. If we have, say, hopping time windows [0, 40], [10, 50], [20, 60] and a record (key, val) with timestamp 25 arrives, it will be forwarded three times ('multiplied') since is falls into the intersection of all three windows. The output will be (key@[0/40], val) (key@[10/50], val) (key@[20/60], val) You can reason about `distinct` operation just like you reason about `sum` or `count`. When a record arrives that falls into a window, we update the aggregation on this window. For `distinct`, when extra records arrive into the same window, we also perform some sort of aggregation (we may even count them internally!), but, unlike sum or count, we will not forward anything since counter is strictly greater than zero. You may refer to 'usage examples' of the KIP (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-UsageExamples) to get clearer idea of how it works. > As I said earlier, I do not think that SQL and the Java Stream API are good arguments to not use a verb This is an important matter. As we all know, naming is hard. However, `distinct` name is not used just in SQL and Java Streams. It is a kind of a standard operation that is used in nearly all the data processing frameworks, see all the hyperlinked examples in 'Motivation' section of KIP (https://cwiki.apache.org/confluence/display/KAFKA/KIP-655:+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation) Please look at it and let me know what do you think. Regards, Ivan 29.07.2021 4:49, John Roesler пишет: Hi Bruno, I had previously been thinking to use equals(), since I thought that this might be a stateless operation. Comparing the serialized form requires a serde and a fairly expensive serialization operation, so while byte equality is superior to equals(), we shouldn't use it in operations unless they already require serialization. I chnaged my mind when I later realized I had been mistaken, and this operation is of course stateful. I hope this helps clarify it. Thanks, -John On Fri, 2021-07-23 at 09:53 +0200, Bruno Cadonna wrote: Hi Ivan and John, 1. John, could you clarify why comparing serialized values seems the way to go, now? 2. Ivan, Could you please answer my questions that I posted earlier? I will repost it here: Ivan, could you please make this matter a bit clearer in the KIP? Actually, thinking about it again, I do currently not see why it should not make sense in hopping windows. Regarding this, I do not understand the following sentence: "hopping and sliding windows do not make much sense for distinct() because they produce multiple intersected windows, so that one record can be multiplied instead of deduplication." Ivan, what do you mean with "multiplied"? 3. As I said earlier, I do not think that SQL and the Java Stream API are good arguments to not use a verb. However, if everybody else is fine with it, I can get behind it. John, good catch about the missing overloads! BTW, the overload with Named should be there regardless of stateful or stateless. Best, Bruno On 22.07.21 20:58, John Roesler wrote: Hi Ivan, Thanks for the reply. 1. I think I might have gotten myself confused. I was thinking of this operation as stateless, but now I'm not sure what I was thinking... This operator has to be stateful, right? In that case, I agree that comparing serialized values seems to be way to do it. 2. Thanks for the confirmation 3. I continue to be satisfied to let you all hash it out. Thanks, -John On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote: Hi all, 1. Actually I always thought about the serialized byte array only -- at least this is what local stores depend upon, and what Kafka itself depends upon when doing log compaction. I can imagine a case where two different byte arrays deserialize to objects which are `equals` to each other. B
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hi! I have updated the KIP with the definition of what is considered to be a duplicated record ("The records are considered to be duplicates iff serialized forms of their keys are equal.") I have also added all the standard overloads for the distinct() method. These overloads are the same as for `count()` Thank you John for noticing this, this is extremely important, in my opinion, some users will want to use in-memory stores here. Although `distinct` is stateful, it's ok to lose state occasionally, because an idempotent handler is expected downstream, which must be able to cope with some duplicates. Concerning the naming. I have also added more examples of `distinct` name usage to the KIP (Apache Spark, Apache Flink, Apache Beam, Hazelcast Jet -- see https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API#KIP655:WindowedDistinctOperationforKafkaStreamsAPI-Motivation) But this is, of course, not a principal point for me, I am ready to be convinced. Sophie and Bruno, do you think that this operation needs another name? What do other people think? Regards, Ivan 22.07.2021 22:03, John Roesler пишет: Hi Ivan, Sorry for the late amendment, but I was just reviewing the KIP again for your vote thread, and it struck me that, if this is a stateful operation, we need a few more overloads. Following the example from the other stateful windowed operators, we should have: distinct() distinct(Named) distinct(Materialized) It's a small update, but an important one, since people will inevitably need to customize the state store for the operation. Thanks, John On Thu, 2021-07-22 at 13:58 -0500, John Roesler wrote: Hi Ivan, Thanks for the reply. 1. I think I might have gotten myself confused. I was thinking of this operation as stateless, but now I'm not sure what I was thinking... This operator has to be stateful, right? In that case, I agree that comparing serialized values seems to be way to do it. 2. Thanks for the confirmation 3. I continue to be satisfied to let you all hash it out. Thanks, -John On Tue, 2021-07-20 at 11:42 +0300, Ivan Ponomarev wrote: Hi all, 1. Actually I always thought about the serialized byte array only -- at least this is what local stores depend upon, and what Kafka itself depends upon when doing log compaction. I can imagine a case where two different byte arrays deserialize to objects which are `equals` to each other. But I think we can ignore this for now because IMO the risks related to buggy `equals` implementations outweigh the potential benefits. I will mention the duplicate definition in the KIP. 2. I agree with John, he got my point. 3. Let me gently insist on `distinct`. I believe that an exception to the rule is appropriate here, because the name `distinct()` is ubiquitous. It's not only about Java Streams API (or .NET LINQ, which appeared earlier and also has `Distinct`): Spark's DataFrame has `distinct()` method, Hazelcast Jet has `distinct()` method, and I bet I can find more examples if I search. When we teach KStreams, we always say that KStreams are just like other streaming APIs, and they have roots in SQL queries. Users know what `distinct` is and they expect it to be in the API. Regards, Ivan 13.07.2021 0:10, John Roesler пишет: Hi all, Bruno raised some very good points. I’d like to chime in with additional context. 1. Great point. We faced a similar problem defining KIP-557. For 557, we chose to use the serialized byte array instead of the equals() method, but I think the situation in KIP-655 is a bit different. I think it might make sense to use the equals() method here, but am curious what Ivan thinks. 2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't make a ton of sense to use it, which I agree with, but it doesn't seem like that means we should prohibit it. 3. FWIW, I don't have a strong feeling either way. Thanks, -John On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote: Hi Ivan, Thank you for the KIP! Some aspects are not clear to me from the KIP and I have a proposal. 1. The KIP does not describe the criteria that define a duplicate. Could you add a definition of duplicate to the KIP? 2. The KIP does not describe what happens if distinct() is applied on a hopping window. On the DSL level, I do not see how you can avoid that users apply distinct() on a hopping window, i.e., you cannot avoid it at compile time, you need to check it at runtime and throw an exception. Is this correct or am I missing something? 3. I would also like to back a proposal by Sophie. She proposed to use deduplicate() instead of distinct(), since the other DSL operations are also verbs. I do not think that SQL and the Java Stream API are good arguments to not use a verb. Best, Bruno On 10.07.21 19:11, John Roesler wrote: Hi Ivan, Sorry for the silence! I have just re-read the proposal. To summarize, you are now only
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hi all, 1. Actually I always thought about the serialized byte array only -- at least this is what local stores depend upon, and what Kafka itself depends upon when doing log compaction. I can imagine a case where two different byte arrays deserialize to objects which are `equals` to each other. But I think we can ignore this for now because IMO the risks related to buggy `equals` implementations outweigh the potential benefits. I will mention the duplicate definition in the KIP. 2. I agree with John, he got my point. 3. Let me gently insist on `distinct`. I believe that an exception to the rule is appropriate here, because the name `distinct()` is ubiquitous. It's not only about Java Streams API (or .NET LINQ, which appeared earlier and also has `Distinct`): Spark's DataFrame has `distinct()` method, Hazelcast Jet has `distinct()` method, and I bet I can find more examples if I search. When we teach KStreams, we always say that KStreams are just like other streaming APIs, and they have roots in SQL queries. Users know what `distinct` is and they expect it to be in the API. Regards, Ivan 13.07.2021 0:10, John Roesler пишет: Hi all, Bruno raised some very good points. I’d like to chime in with additional context. 1. Great point. We faced a similar problem defining KIP-557. For 557, we chose to use the serialized byte array instead of the equals() method, but I think the situation in KIP-655 is a bit different. I think it might make sense to use the equals() method here, but am curious what Ivan thinks. 2. I figured we'd do nothing. I thought Ivan was just saying that it doesn't make a ton of sense to use it, which I agree with, but it doesn't seem like that means we should prohibit it. 3. FWIW, I don't have a strong feeling either way. Thanks, -John On Mon, Jul 12, 2021, at 09:14, Bruno Cadonna wrote: Hi Ivan, Thank you for the KIP! Some aspects are not clear to me from the KIP and I have a proposal. 1. The KIP does not describe the criteria that define a duplicate. Could you add a definition of duplicate to the KIP? 2. The KIP does not describe what happens if distinct() is applied on a hopping window. On the DSL level, I do not see how you can avoid that users apply distinct() on a hopping window, i.e., you cannot avoid it at compile time, you need to check it at runtime and throw an exception. Is this correct or am I missing something? 3. I would also like to back a proposal by Sophie. She proposed to use deduplicate() instead of distinct(), since the other DSL operations are also verbs. I do not think that SQL and the Java Stream API are good arguments to not use a verb. Best, Bruno On 10.07.21 19:11, John Roesler wrote: Hi Ivan, Sorry for the silence! I have just re-read the proposal. To summarize, you are now only proposing the zero-arg distict() method to be added to TimeWindowedKStream and SessionWindowedKStream, right? I’m in favor of this proposal. Thanks, John On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: Hello everyone, I would like to remind you about KIP-655 and KIP-759 just in case they got lost in your inbox. Now the initial proposal is split into two independent and smaller ones, so it must be easier to review them. Of course, if you have time. Regards, Ivan 24.06.2021 18:11, Ivan Ponomarev пишет: Hello all, I have rewritten the KIP-655 summarizing what was agreed upon during this discussion (now the proposal is much simpler and less invasive). I have also created KIP-759 (cancelRepartition operation) and started a discussion for it. Regards, Ivan. 04.06.2021 8:15, Matthias J. Sax пишет: Just skimmed over the thread -- first of all, I am glad that we could merge KIP-418 and ship it :) About the re-partitioning concerns, there are already two tickets for it: - https://issues.apache.org/jira/browse/KAFKA-4835 - https://issues.apache.org/jira/browse/KAFKA-10844 Thus, it seems best to exclude this topic from this KIP, and do a separate KIP for it (if necessary, we can "pause" this KIP until the repartition KIP is done). It's a long standing "issue" and we should resolve it in a general way I guess. (Did not yet ready all responses in detail yet, so keeping this comment short.) -Matthias On 6/2/21 6:35 AM, John Roesler wrote: Thanks, Ivan! That sounds like a great plan to me. Two smaller KIPs are easier to agree on than one big one. I agree hopping and sliding windows will actually have a duplicating effect. We can avoid adding distinct() to the sliding window interface, but hopping windows are just a different parameterization of epoch-aligned windows. It seems we can’t do much about that except document the issue. Thanks, John On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: Hi John! I think that your proposal is just fantastic, it simplifies things a lot! I also felt uncomfortable due to the fact that the proposed `distinct()` is not somewhere near `
[VOTE] KIP-655: Windowed "Distinct" Operation for Kafka Streams
Hello all! I'd like to start the vote for KIP-655 which proposes the zero-arg distict() method to be added to TimeWindowedKStream and SessionWindowedKStream. https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API Regards, Ivan
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hi, John! > To summarize, you are now only proposing the zero-arg distict() method to be added to TimeWindowedKStream and SessionWindowedKStream, right? Right, yes, it's that simple! Okay, since you and Israel Ekpo both agreed, I'll start voting thread and will start the implementation. But this change alone without .cancelRepartition() will not solve my problem :-) so I would also like to see your thoughts in KIP-759 discussion thread. Regards, Ivan 10.07.2021 20:11, John Roesler пишет: Hi Ivan, Sorry for the silence! I have just re-read the proposal. To summarize, you are now only proposing the zero-arg distict() method to be added to TimeWindowedKStream and SessionWindowedKStream, right? I’m in favor of this proposal. Thanks, John On Sat, Jul 10, 2021, at 10:18, Ivan Ponomarev wrote: Hello everyone, I would like to remind you about KIP-655 and KIP-759 just in case they got lost in your inbox. Now the initial proposal is split into two independent and smaller ones, so it must be easier to review them. Of course, if you have time. Regards, Ivan 24.06.2021 18:11, Ivan Ponomarev пишет: Hello all, I have rewritten the KIP-655 summarizing what was agreed upon during this discussion (now the proposal is much simpler and less invasive). I have also created KIP-759 (cancelRepartition operation) and started a discussion for it. Regards, Ivan. 04.06.2021 8:15, Matthias J. Sax пишет: Just skimmed over the thread -- first of all, I am glad that we could merge KIP-418 and ship it :) About the re-partitioning concerns, there are already two tickets for it: - https://issues.apache.org/jira/browse/KAFKA-4835 - https://issues.apache.org/jira/browse/KAFKA-10844 Thus, it seems best to exclude this topic from this KIP, and do a separate KIP for it (if necessary, we can "pause" this KIP until the repartition KIP is done). It's a long standing "issue" and we should resolve it in a general way I guess. (Did not yet ready all responses in detail yet, so keeping this comment short.) -Matthias On 6/2/21 6:35 AM, John Roesler wrote: Thanks, Ivan! That sounds like a great plan to me. Two smaller KIPs are easier to agree on than one big one. I agree hopping and sliding windows will actually have a duplicating effect. We can avoid adding distinct() to the sliding window interface, but hopping windows are just a different parameterization of epoch-aligned windows. It seems we can’t do much about that except document the issue. Thanks, John On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: Hi John! I think that your proposal is just fantastic, it simplifies things a lot! I also felt uncomfortable due to the fact that the proposed `distinct()` is not somewhere near `count()` and `reduce(..)`. But `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like a correct option for me because of the issue with the unneeded repartitioning. The bold idea that we can just CANCEL the repartitioning didn't came to my mind. What seemed to me a single problem is in fact two unrelated problems: `distinct` operation and cancelling the unneeded repartitioning. > what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? I think a more elegant solution would be not to add a new parameter to `selectKey` and all the other key-changing operations (`map`, `transform`, `flatMap`, ...), but add a new operator `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag for the upstream node. Of course, "use it only if you know what you're doing" warning is to be added. Well, it's a topic for a separate KIP! Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then changes to the API are minimally invasive: we're just adding `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and that's all. We can now define `distinct` as an operation that returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window. BTW, we can mock the behaviour of such an operation with `TopologyTestDriver` using `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) Consider the following example (record times are in seconds): //three bursts of variously ordered records 4, 5, 6 23, 22, 24 34, 33, 32 //'late arrivals' 7, 22, 35 1. 'Epoch-aligned deduplication' using tumbling windows: .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() produces (key@[0/1], 4) (key@[2/3], 23) (key@[3/4], 34) -- that is, one record per epoch-aligned window. 2. Hopping and sliding windows do not make much sense here, because they produce multiple intersected windows, so that one record can be multiplied, but we want deduplication. 3. SessionWindows work for 'data-aligned deduplication'. .groupByKey().windowedBy
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hello everyone, I would like to remind you about KIP-655 and KIP-759 just in case they got lost in your inbox. Now the initial proposal is split into two independent and smaller ones, so it must be easier to review them. Of course, if you have time. Regards, Ivan 24.06.2021 18:11, Ivan Ponomarev пишет: Hello all, I have rewritten the KIP-655 summarizing what was agreed upon during this discussion (now the proposal is much simpler and less invasive). I have also created KIP-759 (cancelRepartition operation) and started a discussion for it. Regards, Ivan. 04.06.2021 8:15, Matthias J. Sax пишет: Just skimmed over the thread -- first of all, I am glad that we could merge KIP-418 and ship it :) About the re-partitioning concerns, there are already two tickets for it: - https://issues.apache.org/jira/browse/KAFKA-4835 - https://issues.apache.org/jira/browse/KAFKA-10844 Thus, it seems best to exclude this topic from this KIP, and do a separate KIP for it (if necessary, we can "pause" this KIP until the repartition KIP is done). It's a long standing "issue" and we should resolve it in a general way I guess. (Did not yet ready all responses in detail yet, so keeping this comment short.) -Matthias On 6/2/21 6:35 AM, John Roesler wrote: Thanks, Ivan! That sounds like a great plan to me. Two smaller KIPs are easier to agree on than one big one. I agree hopping and sliding windows will actually have a duplicating effect. We can avoid adding distinct() to the sliding window interface, but hopping windows are just a different parameterization of epoch-aligned windows. It seems we can’t do much about that except document the issue. Thanks, John On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: Hi John! I think that your proposal is just fantastic, it simplifies things a lot! I also felt uncomfortable due to the fact that the proposed `distinct()` is not somewhere near `count()` and `reduce(..)`. But `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like a correct option for me because of the issue with the unneeded repartitioning. The bold idea that we can just CANCEL the repartitioning didn't came to my mind. What seemed to me a single problem is in fact two unrelated problems: `distinct` operation and cancelling the unneeded repartitioning. > what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? I think a more elegant solution would be not to add a new parameter to `selectKey` and all the other key-changing operations (`map`, `transform`, `flatMap`, ...), but add a new operator `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag for the upstream node. Of course, "use it only if you know what you're doing" warning is to be added. Well, it's a topic for a separate KIP! Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then changes to the API are minimally invasive: we're just adding `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and that's all. We can now define `distinct` as an operation that returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window. BTW, we can mock the behaviour of such an operation with `TopologyTestDriver` using `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) Consider the following example (record times are in seconds): //three bursts of variously ordered records 4, 5, 6 23, 22, 24 34, 33, 32 //'late arrivals' 7, 22, 35 1. 'Epoch-aligned deduplication' using tumbling windows: .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() produces (key@[0/1], 4) (key@[2/3], 23) (key@[3/4], 34) -- that is, one record per epoch-aligned window. 2. Hopping and sliding windows do not make much sense here, because they produce multiple intersected windows, so that one record can be multiplied, but we want deduplication. 3. SessionWindows work for 'data-aligned deduplication'. .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() produces only ([key@4000/4000], 4) ([key@23000/23000], 23) because all the records bigger than 7 are stuck together in one session. Setting inactivity gap to 9 seconds will return three records: ([key@4000/4000], 4) ([key@23000/23000], 23) ([key@34000/34000], 34) WDYT? If you like this variant, I will re-write KIP-655 and propose a separate KIP for `cancelRepartitioning` (or whatever name we will choose for it). Regards, Ivan 24.05.2021 22:32, John Roesler пишет: Hey there, Ivan! In typical fashion, I'm going to make a somewhat outlandish proposal. I'm hoping that we can side-step some of the complications that have arisen. Please bear with me. It seems like `distinct()` is not fundamentally unlike other windowed
[DISCUSS] KIP-759: Unneeded repartition canceling
Hello, I'd like to start a discussion for KIP-759: https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling This is an offshoot of the discussion of KIP-655 for a `distinct` operator, which turned out to be a separate proposal. The proposal is quite trivial, however, we still might consider alternatives (see 'Possible Alternatives' section). Regards, Ivan
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hello all, I have rewritten the KIP-655 summarizing what was agreed upon during this discussion (now the proposal is much simpler and less invasive). I have also created KIP-759 (cancelRepartition operation) and started a discussion for it. Regards, Ivan. 04.06.2021 8:15, Matthias J. Sax пишет: Just skimmed over the thread -- first of all, I am glad that we could merge KIP-418 and ship it :) About the re-partitioning concerns, there are already two tickets for it: - https://issues.apache.org/jira/browse/KAFKA-4835 - https://issues.apache.org/jira/browse/KAFKA-10844 Thus, it seems best to exclude this topic from this KIP, and do a separate KIP for it (if necessary, we can "pause" this KIP until the repartition KIP is done). It's a long standing "issue" and we should resolve it in a general way I guess. (Did not yet ready all responses in detail yet, so keeping this comment short.) -Matthias On 6/2/21 6:35 AM, John Roesler wrote: Thanks, Ivan! That sounds like a great plan to me. Two smaller KIPs are easier to agree on than one big one. I agree hopping and sliding windows will actually have a duplicating effect. We can avoid adding distinct() to the sliding window interface, but hopping windows are just a different parameterization of epoch-aligned windows. It seems we can’t do much about that except document the issue. Thanks, John On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote: Hi John! I think that your proposal is just fantastic, it simplifies things a lot! I also felt uncomfortable due to the fact that the proposed `distinct()` is not somewhere near `count()` and `reduce(..)`. But `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like a correct option for me because of the issue with the unneeded repartitioning. The bold idea that we can just CANCEL the repartitioning didn't came to my mind. What seemed to me a single problem is in fact two unrelated problems: `distinct` operation and cancelling the unneeded repartitioning. > what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? I think a more elegant solution would be not to add a new parameter to `selectKey` and all the other key-changing operations (`map`, `transform`, `flatMap`, ...), but add a new operator `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag for the upstream node. Of course, "use it only if you know what you're doing" warning is to be added. Well, it's a topic for a separate KIP! Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then changes to the API are minimally invasive: we're just adding `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and that's all. We can now define `distinct` as an operation that returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window. BTW, we can mock the behaviour of such an operation with `TopologyTestDriver` using `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) Consider the following example (record times are in seconds): //three bursts of variously ordered records 4, 5, 6 23, 22, 24 34, 33, 32 //'late arrivals' 7, 22, 35 1. 'Epoch-aligned deduplication' using tumbling windows: .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() produces (key@[0/1], 4) (key@[2/3], 23) (key@[3/4], 34) -- that is, one record per epoch-aligned window. 2. Hopping and sliding windows do not make much sense here, because they produce multiple intersected windows, so that one record can be multiplied, but we want deduplication. 3. SessionWindows work for 'data-aligned deduplication'. .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() produces only ([key@4000/4000], 4) ([key@23000/23000], 23) because all the records bigger than 7 are stuck together in one session. Setting inactivity gap to 9 seconds will return three records: ([key@4000/4000], 4) ([key@23000/23000], 23) ([key@34000/34000], 34) WDYT? If you like this variant, I will re-write KIP-655 and propose a separate KIP for `cancelRepartitioning` (or whatever name we will choose for it). Regards, Ivan 24.05.2021 22:32, John Roesler пишет: Hey there, Ivan! In typical fashion, I'm going to make a somewhat outlandish proposal. I'm hoping that we can side-step some of the complications that have arisen. Please bear with me. It seems like `distinct()` is not fundamentally unlike other windowed "aggregation" operations. Your concern about unnecessary repartitioning seems to apply just as well to `count()` as to `distinct()`. This has come up before, but I don't remember when: what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning?
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Hi John! I think that your proposal is just fantastic, it simplifies things a lot! I also felt uncomfortable due to the fact that the proposed `distinct()` is not somewhere near `count()` and `reduce(..)`. But `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like a correct option for me because of the issue with the unneeded repartitioning. The bold idea that we can just CANCEL the repartitioning didn't came to my mind. What seemed to me a single problem is in fact two unrelated problems: `distinct` operation and cancelling the unneeded repartitioning. > what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? I think a more elegant solution would be not to add a new parameter to `selectKey` and all the other key-changing operations (`map`, `transform`, `flatMap`, ...), but add a new operator `KStream#cancelRepartitioning()` that resets `keyChangingOperation` flag for the upstream node. Of course, "use it only if you know what you're doing" warning is to be added. Well, it's a topic for a separate KIP! Concerning `distinct()`. If we use `XXXWindowedKStream` facilities, then changes to the API are minimally invasive: we're just adding `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and that's all. We can now define `distinct` as an operation that returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window. BTW, we can mock the behaviour of such an operation with `TopologyTestDriver` using `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)). ;-) Consider the following example (record times are in seconds): //three bursts of variously ordered records 4, 5, 6 23, 22, 24 34, 33, 32 //'late arrivals' 7, 22, 35 1. 'Epoch-aligned deduplication' using tumbling windows: .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct() produces (key@[0/1], 4) (key@[2/3], 23) (key@[3/4], 34) -- that is, one record per epoch-aligned window. 2. Hopping and sliding windows do not make much sense here, because they produce multiple intersected windows, so that one record can be multiplied, but we want deduplication. 3. SessionWindows work for 'data-aligned deduplication'. .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct() produces only ([key@4000/4000], 4) ([key@23000/23000], 23) because all the records bigger than 7 are stuck together in one session. Setting inactivity gap to 9 seconds will return three records: ([key@4000/4000], 4) ([key@23000/23000], 23) ([key@34000/34000], 34) WDYT? If you like this variant, I will re-write KIP-655 and propose a separate KIP for `cancelRepartitioning` (or whatever name we will choose for it). Regards, Ivan 24.05.2021 22:32, John Roesler пишет: Hey there, Ivan! In typical fashion, I'm going to make a somewhat outlandish proposal. I'm hoping that we can side-step some of the complications that have arisen. Please bear with me. It seems like `distinct()` is not fundamentally unlike other windowed "aggregation" operations. Your concern about unnecessary repartitioning seems to apply just as well to `count()` as to `distinct()`. This has come up before, but I don't remember when: what if we introduce a parameter to `selectKey()` that specifies that the caller asserts that the new key does _not_ change the data partitioning? The docs on that parameter would of course spell out all the "rights and responsibilities" of setting it. In that case, we could indeed get back to `selectKey(A).windowBy(B).distinct(...)`, where we get to compose the key mapper and the windowing function without having to carve out a separate domain just for `distinct()`. All the rest of the KStream operations would also benefit. What do you think? Thanks, John On Sun, May 23, 2021, at 08:09, Ivan Ponomarev wrote: Hello everyone, let me revive the discussion for KIP-655. Now I have some time again and I'm eager to finalize this. Based on what was already discussed, I think that we can split the discussion into three topics for our convenience. The three topics are: - idExtractor (how should we extract the deduplication key for the record) - timeWindows (what time windows should we use) - miscellaneous (naming etc.) idExtractor Original proposal: use (k, v) -> f(k, v) mapper, defaulting to (k, v) -> k. The drawback here is that we must warn the user to choose such a function that sets different IDs for records from different partitions, otherwise same IDs might be not co-partitioned (and not deduplicated as a result). Additional concern: what should we do when this function returns null? Matthias proposed key-only deduplication: that is, no idExtractor at all, and if we want to use `distinct` for a partic
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
ggregations, a well-known limitation of the current API. There is actually a KIP <https://cwiki.apache.org/confluence/display/KAFKA/KIP-645%3A+Replace+Windows+with+a+proper+interface> going on in parallel to fix this exact issue and make the windowing interface much more flexible. Maybe instead of re-implementing this windowing interface in a similarly limited fashion for the Distinct operator, we could leverage it here and get all the benefits coming with KIP-645. Specifically, I'm proposing to remove the TimeWindows/etc config from the DistinctParameters class, and move the distinct() method from the KStream interface to the TimeWindowedKStream interface. Since it's semantically similar to a kind of windowed aggregation, it makes sense to align it with the existing windowing framework, ie: inputStream .groupKyKey() .windowedBy() .distinct() Then we could use data-aligned windows if SlidingWindows is specified in the windowedBy(), and epoch-aligned (or some other kind of enumerable window) if a Windows is specified in windowedBy() (or an EnumerableWindowDefinition once KIP-645 is implemented to replace Windows). *SlidingWindows*: should forward a record once when it's first seen, and then not again for any identical records that fall into the next N timeUnits. This includes out-of-order records, ie if you have a SlidingWindows of size 10s and process records at time 15s, 20s, 14s then you would just forward the one at 15s. Presumably, if you're using SlidingWindows, you don't care about what falls into exact time boxes, you just want to deduplicate. If you do care about exact time boxing then you should use... *EnumerableWindowDefinition* (eg *TimeWindows*): should forward only one record per enumerated time window. If you get a records at 15s, 20s,14s where the windows are enumerated at [5,14], [15, 24], etc then you forward the record at 15s and also the record at 14s Just an idea: not sure if the impedance mismatch would throw users off since the semantics of the distinct windows are slightly different than in the aggregations. But if we don't fit this into the existing windowed framework, then we shouldn't use any existing Windows-type classes at all, imo. ie we should create a new DistinctWindows config class, similar to how stream-stream joins get their own JoinWindows class I also think that non-windowed deduplication could be useful, in which case we would want to also have the distinct() operator on the KStream interface. One quick note regarding the naming: it seems like the Streams DSL operators are typically named as verbs rather than adjectives, for example. #suppress or #aggregate. I get that there's some precedent for 'distinct' specifically, but maybe something like 'deduplicate' would be more appropriate for the Streams API. WDYT? On Mon, Sep 14, 2020 at 10:04 AM Ivan Ponomarev wrote: Hi Matthias, Thanks for your review! It made me think deeper, and indeed I understood that I was missing some important details. To simplify, let me explain my particular use case first so I can refer to it later. We have a system that collects information about ongoing live sporting events from different sources. The information sources have their IDs and these IDs are keys of the stream. Each source emits messages concerning sporting events, and we can have many messages about each sporing event from each source. Event ID is extracted from the message. We need a database of event IDs that were reported at least once by each source (important: events from different sources are considered to be different entities). The requirements are: 1) each new event ID should be written to the database as soon as possible 2) although it's ok and sometimes even desired to repeat the notification about already known event ID, but we wouldn’t like our database to be bothered by the same event ID more often than once in a given period of time (say, 15 minutes). With this example in mind let me answer your questions > (1) Using the `idExtractor` has the issue that data might not be > co-partitioned as you mentioned in the KIP. Thus, I am wondering if it > might be better to do deduplication only on the key? If one sets a new > key upstream (ie, extracts the deduplication id into the key), the > `distinct` operator could automatically repartition the data and thus we > would avoid user errors. Of course with 'key-only' deduplication + autorepartitioning we will never cause problems with co-partitioning. But in practice, we often don't need repartitioning even if 'dedup ID' is different from the key, like in my example above. So here we have a sort of 'performance vs security' tradeoff. The 'golden middle way' here can be the following: we can form a deduplication ID as KEY + separator + idExtractor(VALUE). In case idExtractor is not provided, we deduplicate by key only (as in original proposal). Then idExtractor transforms only the value (and not the
Re: [VOTE] KIP-418: A method-chaining way to branch KStream
Hello everyone, this is to warn that actual implementation of KIP-418 differs with the approved specification in two points: 1. Instead of multiple overloaded variants of Branched.with we now have Branched.withFunction (for functions) and Branched.withConsumer (for consumers). This is because of compiler warnings about overloading: Function and Consumer are indistinguishable for Java compiler when supplied as lambdas, and thus we need to name methods differently. 2. 'Fully covariant' signatures like Consumer? super V>> don't work as expected. Using ConsumerV>> etc. instead I have updated the KIP. Does anyone want to object these changes? Regards, Ivan 06.07.2020 21:43, Matthias J. Sax пишет: I am late, but I am also +1 (binding). -Matthias On 7/6/20 2:16 AM, Ivan Ponomarev wrote: Wow! So excited to hear that! Thanks for your collaboration, now it's my turn to write a PR. Regards, Ivan 04.07.2020 20:16, John Roesler пишет: Hi Ivan, Congratulations! It looks like you have 3 binding and 2 non-binding votes, so you can announce this KIP as accepted and follow up with a PR. Thanks, John On Mon, Jun 29, 2020, at 20:46, Bill Bejeck wrote: Thanks for the KIP Ivan, +1 (binding). -Bill On Mon, Jun 29, 2020 at 7:22 PM Guozhang Wang wrote: +1 (binding). Thanks Ivan! Guozhang On Mon, Jun 29, 2020 at 3:55 AM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: This will be a great addition. Thanks Ivan! +1 (non-binding) On Fri, Jun 26, 2020 at 7:07 PM John Roesler wrote: Thanks, Ivan! I’m +1 (binding) -John On Thu, May 28, 2020, at 17:24, Ivan Ponomarev wrote: Hello all! I'd like to start the vote for KIP-418 which proposes deprecation of current `branch` method and provides a method-chaining based API for branching. https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream Regards, Ivan -- -- Guozhang
[jira] [Resolved] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system
[ https://issues.apache.org/jira/browse/KAFKA-12230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Ponomarev resolved KAFKA-12230. Resolution: Duplicate Duplicate of KAFKA-12190 > Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file > system > -- > > Key: KAFKA-12230 > URL: https://issues.apache.org/jira/browse/KAFKA-12230 > Project: Kafka > Issue Type: Bug > Reporter: Ivan Ponomarev > Assignee: Ivan Ponomarev >Priority: Minor > > While developing Kafka on Windows machine, I get some false > TopologyTestDriver-based test failures because of > `Files.setPosixFilePermissions` failure with UnsupportedOperationException, > see e. g. stack trace below > > Simply catching UnsupportedOperationException together with IOException in > StateDirectory. solves this issue > > > {noformat} > java.lang.UnsupportedOperationException > at > java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078) > at > org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118) > at > org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265) > at > org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.gradle.internal.dispatch.R
[jira] [Created] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system
Ivan Ponomarev created KAFKA-12230: -- Summary: Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system Key: KAFKA-12230 URL: https://issues.apache.org/jira/browse/KAFKA-12230 Project: Kafka Issue Type: Bug Reporter: Ivan Ponomarev Assignee: Ivan Ponomarev While developing Kafka on Windows machine, I get some false TopologyTestDriver-based test failures because of `Files.setPosixFilePermissions` failure with UnsupportedOperationException, see e. g. stack trace below Simply catching UnsupportedOperationException together with IOException in StateDirectory. solves this issue {noformat} java.lang.UnsupportedOperationException at java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078) at org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118) at org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306) at org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265) at org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) at java.base
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
ve one overload > with all require parameters, and add optional parameters using the > builder pattern? This seems to follow the DSL Grammer proposal. Oh, I can explain. We can't fully rely on the builder pattern because of Java type inference limitations. We have to provide type parameters to the builder methods or the code won't compile: see e. g. this https://twitter.com/inponomarev/status/1265053286933159938 and following discussion with Tagir Valeev. When we came across the similar difficulties in KIP-418, we finally decided to add all the necessary overloads to parameter class. So I just reproduced that approach here. > (5) Even if it might be an implementation detail (and maybe the KIP > itself does not need to mention it), can you give a high level overview > how you intent to implement it (that would be easier to grog, compared > to reading the PR). Well as with any operation on KStreamImpl level I'm building a store and a processor node. KStreamDistinct class is going to be the ProcessorSupplier, with the logic regarding the forwarding/muting of the records located in KStreamDistinct.KStreamDistinctProcessor#process Matthias, if you are still reading this :-) a gentle reminder: my PR for already accepted KIP-418 is still waiting for your review. I think it's better for me to finalize at least one KIP before proceeding to a new one :-) Regards, Ivan 03.09.2020 4:20, Matthias J. Sax пишет: Thanks for the KIP Ivan. Having a built-in deduplication operator is for sure a good addition. Couple of questions: (1) Using the `idExtractor` has the issue that data might not be co-partitioned as you mentioned in the KIP. Thus, I am wondering if it might be better to do deduplication only on the key? If one sets a new key upstream (ie, extracts the deduplication id into the key), the `distinct` operator could automatically repartition the data and thus we would avoid user errors. (2) What is the motivation for allowing the `idExtractor` to return `null`? Might be good to have some use-case examples for this feature. (2) Is using a `TimeWindow` really what we want? I was wondering if a `SlidingWindow` might be better? Or maybe we need a new type of window? It would be helpful if you could describe potential use cases in more detail. -- I am mainly wondering about hopping window? Each record would always falls into multiple window and thus would be emitted multiple times, ie, each time the window closes. Is this really a valid use case? It seems that for de-duplication, one wants to have some "expiration time", ie, for each ID, deduplicate all consecutive records with the same ID and emit the first record after the "expiration time" passed. In terms of a window, this would mean that the window starts at `r.ts` and ends at `r.ts + windowSize`, ie, the window is aligned to the data. TimeWindows are aligned to the epoch though. While `SlidingWindows` also align to the data, for the aggregation use-case they go backward in time, while we need a window that goes forward in time. It's an open question if we can re-purpose `SlidingWindows` -- it might be ok the make the alignment (into the past vs into the future) an operator dependent behavior? (3) `isPersistent` -- instead of using this flag, it seems better to allow users to pass in a `Materialized` parameter next to `DistinctParameters` to configure the state store? (4) I am wondering if we should really have 4 overloads for `DistinctParameters.with()`? It might be better to have one overload with all require parameters, and add optional parameters using the builder pattern? This seems to follow the DSL Grammer proposal. (5) Even if it might be an implementation detail (and maybe the KIP itself does not need to mention it), can you give a high level overview how you intent to implement it (that would be easier to grog, compared to reading the PR). -Matthias On 8/23/20 4:29 PM, Ivan Ponomarev wrote: Sorry, I forgot to add [DISCUSS] tag to the topic 24.08.2020 2:27, Ivan Ponomarev пишет: Hello, I'd like to start a discussion for KIP-655. KIP-655: https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API I also opened a proof-of-concept PR for you to experiment with the API: PR#9210: https://github.com/apache/kafka/pull/9210 Regards, Ivan Ponomarev
Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream
Sorry, I forgot to add [DISCUSS] tag to the topic 24.08.2020 2:27, Ivan Ponomarev пишет: Hello, I'd like to start a discussion for KIP-655. KIP-655: https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API I also opened a proof-of-concept PR for you to experiment with the API: PR#9210: https://github.com/apache/kafka/pull/9210 Regards, Ivan Ponomarev
KIP-655: Windowed "Distinct" Operation for KStream
Hello, I'd like to start a discussion for KIP-655. KIP-655: https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API I also opened a proof-of-concept PR for you to experiment with the API: PR#9210: https://github.com/apache/kafka/pull/9210 Regards, Ivan Ponomarev
[jira] [Created] (KAFKA-10369) Introduce Distinct operation in KStream
Ivan Ponomarev created KAFKA-10369: -- Summary: Introduce Distinct operation in KStream Key: KAFKA-10369 URL: https://issues.apache.org/jira/browse/KAFKA-10369 Project: Kafka Issue Type: Improvement Reporter: Ivan Ponomarev Assignee: Ivan Ponomarev Message deduplication is a common task. One example: we might have multiple data sources each reporting its state periodically with a relatively high frequency, their current states should be stored in a database. In case the actual change of the state occurs with a lower frequency than it is reported, in order to reduce the number of writes to the database we might want to filter out duplicated messages using Kafka Streams. 'Distinct' operation is common in data processing, e. g. * Java Stream has [distinct() |https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--] operation, * SQL has DISTINCT keyword. Hence it is natural to expect the similar functionality from Kafka Streams. Although Kafka Streams Tutorials contains an [example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html] of how distinct can be emulated , but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct as a first-class DSL operation. Due to 'infinite' nature of KStream, distinct operation should be windowed, similar to windowed joins and aggregations for KStreams. -- This message was sent by Atlassian Jira (v8.3.4#803005)
Re: [VOTE] KIP-418: A method-chaining way to branch KStream
Wow! So excited to hear that! Thanks for your collaboration, now it's my turn to write a PR. Regards, Ivan 04.07.2020 20:16, John Roesler пишет: Hi Ivan, Congratulations! It looks like you have 3 binding and 2 non-binding votes, so you can announce this KIP as accepted and follow up with a PR. Thanks, John On Mon, Jun 29, 2020, at 20:46, Bill Bejeck wrote: Thanks for the KIP Ivan, +1 (binding). -Bill On Mon, Jun 29, 2020 at 7:22 PM Guozhang Wang wrote: +1 (binding). Thanks Ivan! Guozhang On Mon, Jun 29, 2020 at 3:55 AM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: This will be a great addition. Thanks Ivan! +1 (non-binding) On Fri, Jun 26, 2020 at 7:07 PM John Roesler wrote: Thanks, Ivan! I’m +1 (binding) -John On Thu, May 28, 2020, at 17:24, Ivan Ponomarev wrote: Hello all! I'd like to start the vote for KIP-418 which proposes deprecation of current `branch` method and provides a method-chaining based API for branching. https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream Regards, Ivan -- -- Guozhang
[VOTE] KIP-418: A method-chaining way to branch KStream
Hello all! I'd like to start the vote for KIP-418 which proposes deprecation of current `branch` method and provides a method-chaining based API for branching. https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream Regards, Ivan
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hello John, 1. - > Perhaps it would be better to stick with "as" for now > and just file a Jira to switch them all at the same time [for compatibility with Kotlin] Fully agree! BTW it's really not a big problem: in Kotlin they have a standard workaround (https://kotlinlang.org/docs/reference/java-interop.html#escaping-for-java-identifiers-that-are-keywords-in-kotlin). So actually this should be a very low priority issue, if an issue at all. > I don't understand how your new proposed > methods would work any differently than the ones you already > had proposed in the KIP. It seems like you'd still have to provide > the generic type parameters on the first static factory call. Can you > explain how your new interface proposal differs from the existing KIP? In the KIP, I didn't clarify what methods should be static. Now I propose the following methods: non-static: withChain(Function), withName(String). static: as(String), with(Function), with(Function, String). The overloaded `with` version that provides both Function and name can be used without causing type inference problem!! 2. > Regarding making the K,V types covariant also, yes, that would indeed > be nice, but I'm not sure it will actually work. What I'm keeping in mind is the following example: imagine static KStream func(KStream s) { return s.mapValues(n -> (Integer) n + 1); } BranchedKStream b = s.split().branch((k, v) -> isInteger(v), //Won't compile!! Branched.with(Me::func)); The simple workaround here is to change `func`'s return type from KStream<...Integer> to KStream<...Number>. [On the other hand, we already agreed to remove `withJavaConsumer` from `Branched`, so during code migration I will have to modify my functions' return types anyway -- I mean, from `void` to `KStream`!! ] > the map you're returning is Map, and of course a K is not the same as "? extends K", so it doesn't seem compatible. I think what you actually meant here is that KStreamextends V> is not fit as a value for Map>. This particularly is not a problem, since KStream can be safely explicitly cast to KStream, and be put to the map. BUT, I do really afraid of pitfalls of nested wildcard types. So maybe for now it's better to just admit that API is not absolutely perfect and accept it as is, that is Function, ? extends KStream> Regards, Ivan 21.05.2020 17:59, John Roesler пишет: Hello Ivan, Thanks for the refinement. Actually, I did not know that "as" would clash with a Kotlin operator. Maybe we should depart from convention and just avoid methods named "as" in the future. The convention is that "as(String name)" is used for the static factory method, whereas "withName(String name)" is an instance method inherited from NamedOperation. If you wish to propose to avoid "as" for compatibility with Kotlin, I might suggest "fromName(String name)", although it's somewhat dubious, since all the other configuration classes use "as". Perhaps it would be better to stick with "as" for now and just file a Jira to switch them all at the same time. Re. 3: Regarding the type inference problem, yes, it's a blemish on all of our configuraion objects. The problem is that Java infers the type based on the _first_ method in the chain. While it does consider what the recipient of the method result wants, it only considers the _next_ recipient. Thus, if you call as("foo") and immediately assign it to a Branched variable, java infers the type correctly. But when the "next recipient" is a chained method call, like "withChain", then the chained method doesn't bound the type (by definition, withChain is defined on Branched, so Java will take the broadest possible inferece and bind the type to Branched, at which point, it can't be revised anymore. As a user of Java, this is exceedingly annoying, since it doesn't seem that hard to recursively consider the entire context when inferring the generic type parameters, but this is what we have to work with. To be honest, though, I don't understand how your new proposed methods would work any differently than the ones you already had proposed in the KIP. It seems like you'd still have to provide the generic type parameters on the first static factory call. Can you explain how your new interface proposal differs from the existing KIP? Re. 4: Regarding making the K,V types covariant also, yes, that would indeed be nice, but I'm not sure it will actually work. You might want to give it a try. In the past, we've run into soe truly strange interactions between the Java type inferencer and lambdas (and/or anonymous inner classes) in combination with nested covariant types. Another issue is that the value type
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
? IMHO, we should throw an exception for this case to avoid a `key -> null` entry in the returned Map. Following this train of through, and if we want to allow the "return null" pattern in general, we need `withJavaConsumer` that does not add an entry to the Map. Following your proposal, the semantics of `withJavaConsumer` could also be achieved with `withChain`: Branched.withChain(s -> { s.to(); return s; }) Thus, for you proposal `withJavaConumer` is purely syntactic sugar, while for the first proposal it adds new functionality (if `return null` is not allowed, using `withChain()` is not possible to "hide a sub-stream in the result). Furthermore, we might need to allow `return null` in your prosal to allow uses to "hide" a sub-stream in the Map. I guess I can be convinced either way. However, if we follow your proposal, I am wondering if we need `withJavaConsumer` at all? Its benefit seems to be small? Also, having a reduced API is usually preferable as it's simpler to learn. -Matthias On 5/15/20 3:12 PM, Ivan Ponomarev wrote: Hello, John, hello Matthias! Thank you very much for your detailed feedback! - John, It looks like you missed my reply on Apr 23rd. For some unknown reason it didn't reach my inbox, fortunately we have all the emails on the web. 1. Can you propose to deprecate (but not remove) the existing ‘branch’ method? Done, in "Compatibility, Deprecation, and Migration Plan" section. 2. [Explain why 'branch' operator is superior to branching directly off of the parent KStream for the needs of dynamic branching] Done, see an ugly counterexample in 'Dynamic Branching' section. 3. [`withConsumer` is confusing with Kafka Consumer... maybe `withSink`?] As Mathhias noted, `withSink` can also be confusing. I renamed this method to `withJavaConsumer` per Matthias' suggestion. 4. ...It seems like there are two disjoint use cases: EITHER using chain and the result map OR using just the sink This is discussed below. -- Mathhias, 1. [We should rename `KBranchedStream` -> `BranchedKStream`] Done. 2. [Ambiguous phrase about 'parameterless' version of the `branch` method] Fixed. 3. Overview of newly added methods/interfaces Done in `Proposed Changes` section. 4. [Concerning John's note] > I don't think that using both `withChain()` and `withConsumer()` is the issue, as the KIP clearly states that the result of `withChain()` will be given to the `Consumer`. Yes, I agree! The issue is really with the `Consumer` and the returned `Map` of `defautBranch()` and `noDefaultBranch()`. Maybe a reasonable implementation would be to not add the "branch" to the result map if `withConsumer` is used? But is it also an issue? With Kafka Streams, we can split the topology graph at any point. Technically, it's OK to do both: feed the KStream to a [Java]Consumer AND save it in resulting Map. If one doesn't need the stream in the Map, one simply does not extract it from there :-) In the current version of KIP it is assumed that the returned map contains ALL the branches, either tagged with IDs explicitly set by the programmer, or with some default auto-generated ids. Dealing with this map is the user's responsibility. What seems to me to be an issue is introducing exclusions to this general rule, like 'swallowing' some streams by provided [Java]Consumers. This can make things complicated. What if a user provides both the name of the branch and a [Java]Consumer? What do they mean in this case? Should we 'swallow' the stream or save it to the map? There's no point in 'saving the space' in this map, so maybe just leave it as it is? I rewrote the KIP and also fixed a couple of typos. Looking forward for your feedback again! Regards, Ivan. 08.05.2020 22:55, Matthias J. Sax пишет: Thanks for updating the KIP! I also have some minor comment: (1) We should rename `KBranchedStream` -> `BranchedKStream` (Most classed follow this naming pattern now, eg, CoGroupedKStream, TimeWindowedKStream etc -- there is just the "legacy" `KGroupedStream` and `KGroupedKTable` that we cannot rename without a breaking change... so we just keep them.) (2) Quote: Both branch and defaultBranch operations also have overloaded parameterless alternatives. I think `branch()` always needs to take a `Predicate` and assume you meant that `Branched` is optional. Can you maybe rephrase it accordingly as `branch()` would not be "parameterless". (3) Can you maybe add an overview in the "Public Interface" section) of newly added and deprecated methods/classes (cf. https://cwiki.apache.org/confluence/display/KAFKA/KIP-150+-+Kafka-Streams+Cogroup#KIP-150-Kafka-StreamsCogroup-PublicInterfaces) (4) What is unclear from the KIP is the interaction of `withConsumer()` and the finally returned `Map`. This rel
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
the issue that it might be confused with a "sink node", ie., writing the KStream to a topic. Maybe `withJavaConsumer` would make it less ambiguous? -Matthias On 5/8/20 7:13 AM, John Roesler wrote: Hi Ivan, It looks like you missed my reply on Apr 23rd. I think it’s close, but I had a few last comments. Thanks, John On Sun, May 3, 2020, at 15:39, Ivan Ponomarev wrote: Hello everyone, will someone please take a look at the reworked KIP? I believe that now it follows design principles and takes into account all the arguments discussed here. Regards, Ivan 23.04.2020 2:45, Ivan Ponomarev пишет: Hi, I have read the John's "DSL design principles" and have completely rewritten the KIP, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream This version includes all the previous discussion results and follows the design principles, with one exception. The exception is branch(Predicate predicate, Branched branched) which formally violates 'no more than one parameter' rule, but I think here it is justified. We must provide a predicate for a branch and don't need to provide one for the default branch. Thus for both operations we may use a single Branched parameter class, with an extra method parameter for `branch`. Since predicate is a natural, necessary part of a branch, no 'proliferation of overloads, deprecations, etc.' is expected here as it is said in the rationale for the 'single parameter rule'. WDYT, is this KIP mature enough to begin voting? Regards, Ivan 21.04.2020 2:09, Matthias J. Sax пишет: Ivan, no worries about getting side tracked. Glad to have you back! The DSL improved further in the meantime and we already have a `Named` config object to name operators. It seems reasonable to me to build on this. Furthermore, John did a writeup about "DSL design principles" that we want to follow: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar -- might be worth to checkout. -Matthias On 4/17/20 4:30 PM, Ivan Ponomarev wrote: Hi everyone! Let me revive the discussion of this KIP. I'm very sorry for stopping my participation in the discussion in June 2019. My project work was very intensive then and it didn't leave me spare time. But I think I must finish this, because we invested substantial effort into this discussion and I'm not feel entitled to propose other things before this one is finalized. During these months I proceeded with writing and reviewing Kafka Streams-related code. Every time I needed branching, Spring-Kafka's KafkaStreamBrancher class of my invention (the original idea for this KIP) worked for me -- that's another reason why I gave up pushing the KIP forward. When I was coming across the problem with the scope of branches, I worked around it this way: AtomicReference> result = new AtomicReference<>(); new KafkaStreamBrancher<>() .branch() .defaultBranch(result::set) .onTopOf(someStream); result.get()... And yes, of course I don't feel very happy with this approach. I think that Matthias came up with a bright solution in his post from May, 24th 2019. Let me quote it: KStream#split() -> KBranchedStream // branch is not easily accessible in current scope KBranchedStream#branch(Predicate, Consumer) -> KBranchedStream // assign a name to the branch and // return the sub-stream to the current scope later // // can be simple as `#branch(p, s->s, "name")` // or also complex as `#branch(p, s->s.filter(...), "name")` KBranchedStream#branch(Predicate, Function, String) -> KBranchedStream // default branch is not easily accessible // return map of all named sub-stream into current scope KBranchedStream#default(Cosumer) -> Map // assign custom name to default-branch // return map of all named sub-stream into current scope KBranchedStream#default(Function, String) -> Map // assign a default name for default // return map of all named sub-stream into current scope KBranchedStream#defaultBranch(Function) -> Map // return map of all names sub-stream into current scope KBranchedStream#noDefaultBranch() -> Map I believe this would satisfy everyone. Optional names seems to be a good idea: when you don't need to have the branches in the same scope, you just don't use names and you don't risk making your code brittle. Or, you might want to add names just for debugging purposes. Or, finally, you might use the returned Map to have the named branches in the original scope. There also was an input from John Roesler on June 4th, 2019, who suggested using Named class. I can't comment on this. The idea seems reasonable, but in this matter I'd rather trust people who are more familiar with Streams API design principles than me. Regards, Ivan 08.10.2019 1:38, Matthias J. Sax пишет: I am moving this KIP into "inactive status". Feel free to resume the K
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hello everyone, will someone please take a look at the reworked KIP? I believe that now it follows design principles and takes into account all the arguments discussed here. Regards, Ivan 23.04.2020 2:45, Ivan Ponomarev пишет: Hi, I have read the John's "DSL design principles" and have completely rewritten the KIP, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream This version includes all the previous discussion results and follows the design principles, with one exception. The exception is branch(Predicate predicate, Branched branched) which formally violates 'no more than one parameter' rule, but I think here it is justified. We must provide a predicate for a branch and don't need to provide one for the default branch. Thus for both operations we may use a single Branched parameter class, with an extra method parameter for `branch`. Since predicate is a natural, necessary part of a branch, no 'proliferation of overloads, deprecations, etc.' is expected here as it is said in the rationale for the 'single parameter rule'. WDYT, is this KIP mature enough to begin voting? Regards, Ivan 21.04.2020 2:09, Matthias J. Sax пишет: Ivan, no worries about getting side tracked. Glad to have you back! The DSL improved further in the meantime and we already have a `Named` config object to name operators. It seems reasonable to me to build on this. Furthermore, John did a writeup about "DSL design principles" that we want to follow: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar -- might be worth to checkout. -Matthias On 4/17/20 4:30 PM, Ivan Ponomarev wrote: Hi everyone! Let me revive the discussion of this KIP. I'm very sorry for stopping my participation in the discussion in June 2019. My project work was very intensive then and it didn't leave me spare time. But I think I must finish this, because we invested substantial effort into this discussion and I'm not feel entitled to propose other things before this one is finalized. During these months I proceeded with writing and reviewing Kafka Streams-related code. Every time I needed branching, Spring-Kafka's KafkaStreamBrancher class of my invention (the original idea for this KIP) worked for me -- that's another reason why I gave up pushing the KIP forward. When I was coming across the problem with the scope of branches, I worked around it this way: AtomicReference> result = new AtomicReference<>(); new KafkaStreamBrancher<>() .branch() .defaultBranch(result::set) .onTopOf(someStream); result.get()... And yes, of course I don't feel very happy with this approach. I think that Matthias came up with a bright solution in his post from May, 24th 2019. Let me quote it: KStream#split() -> KBranchedStream // branch is not easily accessible in current scope KBranchedStream#branch(Predicate, Consumer) -> KBranchedStream // assign a name to the branch and // return the sub-stream to the current scope later // // can be simple as `#branch(p, s->s, "name")` // or also complex as `#branch(p, s->s.filter(...), "name")` KBranchedStream#branch(Predicate, Function, String) -> KBranchedStream // default branch is not easily accessible // return map of all named sub-stream into current scope KBranchedStream#default(Cosumer) -> Map // assign custom name to default-branch // return map of all named sub-stream into current scope KBranchedStream#default(Function, String) -> Map // assign a default name for default // return map of all named sub-stream into current scope KBranchedStream#defaultBranch(Function) -> Map // return map of all names sub-stream into current scope KBranchedStream#noDefaultBranch() -> Map I believe this would satisfy everyone. Optional names seems to be a good idea: when you don't need to have the branches in the same scope, you just don't use names and you don't risk making your code brittle. Or, you might want to add names just for debugging purposes. Or, finally, you might use the returned Map to have the named branches in the original scope. There also was an input from John Roesler on June 4th, 2019, who suggested using Named class. I can't comment on this. The idea seems reasonable, but in this matter I'd rather trust people who are more familiar with Streams API design principles than me. Regards, Ivan 08.10.2019 1:38, Matthias J. Sax пишет: I am moving this KIP into "inactive status". Feel free to resume the KIP at any point. If anybody else is interested in picking up this KIP, feel free to do so. -Matthias On 7/11/19 4:00 PM, Matthias J. Sax wrote: Ivan, did you see my last reply? What do you think about my proposal to mix both approaches and try to get best-of-both worlds? -Matthias On 6/11/19 3:56 PM, Matthias J. Sax wrote: Thanks for the input John!
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hi, I have read the John's "DSL design principles" and have completely rewritten the KIP, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream This version includes all the previous discussion results and follows the design principles, with one exception. The exception is branch(Predicate predicate, Branched branched) which formally violates 'no more than one parameter' rule, but I think here it is justified. We must provide a predicate for a branch and don't need to provide one for the default branch. Thus for both operations we may use a single Branched parameter class, with an extra method parameter for `branch`. Since predicate is a natural, necessary part of a branch, no 'proliferation of overloads, deprecations, etc.' is expected here as it is said in the rationale for the 'single parameter rule'. WDYT, is this KIP mature enough to begin voting? Regards, Ivan 21.04.2020 2:09, Matthias J. Sax пишет: Ivan, no worries about getting side tracked. Glad to have you back! The DSL improved further in the meantime and we already have a `Named` config object to name operators. It seems reasonable to me to build on this. Furthermore, John did a writeup about "DSL design principles" that we want to follow: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar -- might be worth to checkout. -Matthias On 4/17/20 4:30 PM, Ivan Ponomarev wrote: Hi everyone! Let me revive the discussion of this KIP. I'm very sorry for stopping my participation in the discussion in June 2019. My project work was very intensive then and it didn't leave me spare time. But I think I must finish this, because we invested substantial effort into this discussion and I'm not feel entitled to propose other things before this one is finalized. During these months I proceeded with writing and reviewing Kafka Streams-related code. Every time I needed branching, Spring-Kafka's KafkaStreamBrancher class of my invention (the original idea for this KIP) worked for me -- that's another reason why I gave up pushing the KIP forward. When I was coming across the problem with the scope of branches, I worked around it this way: AtomicReference> result = new AtomicReference<>(); new KafkaStreamBrancher<>() .branch() .defaultBranch(result::set) .onTopOf(someStream); result.get()... And yes, of course I don't feel very happy with this approach. I think that Matthias came up with a bright solution in his post from May, 24th 2019. Let me quote it: KStream#split() -> KBranchedStream // branch is not easily accessible in current scope KBranchedStream#branch(Predicate, Consumer) -> KBranchedStream // assign a name to the branch and // return the sub-stream to the current scope later // // can be simple as `#branch(p, s->s, "name")` // or also complex as `#branch(p, s->s.filter(...), "name")` KBranchedStream#branch(Predicate, Function, String) -> KBranchedStream // default branch is not easily accessible // return map of all named sub-stream into current scope KBranchedStream#default(Cosumer) -> Map // assign custom name to default-branch // return map of all named sub-stream into current scope KBranchedStream#default(Function, String) -> Map // assign a default name for default // return map of all named sub-stream into current scope KBranchedStream#defaultBranch(Function) -> Map // return map of all names sub-stream into current scope KBranchedStream#noDefaultBranch() -> Map I believe this would satisfy everyone. Optional names seems to be a good idea: when you don't need to have the branches in the same scope, you just don't use names and you don't risk making your code brittle. Or, you might want to add names just for debugging purposes. Or, finally, you might use the returned Map to have the named branches in the original scope. There also was an input from John Roesler on June 4th, 2019, who suggested using Named class. I can't comment on this. The idea seems reasonable, but in this matter I'd rather trust people who are more familiar with Streams API design principles than me. Regards, Ivan 08.10.2019 1:38, Matthias J. Sax пишет: I am moving this KIP into "inactive status". Feel free to resume the KIP at any point. If anybody else is interested in picking up this KIP, feel free to do so. -Matthias On 7/11/19 4:00 PM, Matthias J. Sax wrote: Ivan, did you see my last reply? What do you think about my proposal to mix both approaches and try to get best-of-both worlds? -Matthias On 6/11/19 3:56 PM, Matthias J. Sax wrote: Thanks for the input John! under your suggestion, it seems that the name is required If you want to get the `KStream` as part of the `Map` back using a `Function`, yes. If you follow the "embedded chaining" pattern using a `Consumer`, no. Allowing for a default
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
/ assign custom name to default-branch // return map of all named sub-stream into current scope KBranchedStream#default(Function, String) -> Map // assign a default name for default // return map of all named sub-stream into current scope KBranchedStream#defaultBranch(Function) -> Map // return map of all names sub-stream into current scope KBranchedStream#noDefaultBranch() -> Map Hence, for each sub-stream, the user can pick to add a name and return the branch "result" to the calling scope or not. The implementation can also check at runtime that all returned names are unique. The returned Map can be empty and it's also optional to use the Map. To me, it seems like a good way to get best of both worlds. Thoughts? -Matthias On 5/6/19 5:15 PM, John Roesler wrote: Ivan, That's a very good point about the "start" operator in the dynamic case. I had no problem with "split()"; I was just questioning the necessity. Since you've provided a proof of necessity, I'm in favor of the "split()" start operator. Thanks! Separately, I'm interested to see where the present discussion leads. I've written enough Javascript code in my life to be suspicious of nested closures. You have a good point about using method references (or indeed function literals also work). It should be validating that this was also the JS community's first approach to flattening the logic when their nested closure situation got out of hand. Unfortunately, it's replacing nesting with redirection, both of which disrupt code readability (but in different ways for different reasons). In other words, I agree that function references is *the* first-order solution if the nested code does indeed become a problem. However, the history of JS also tells us that function references aren't the end of the story either, and you can see that by observing that there have been two follow-on eras, as they continue trying to cope with the consequences of living in such a callback-heavy language. First, you have Futures/Promises, which essentially let you convert nested code to method-chained code (Observables/FP is a popular variation on this). Most lately, you have async/await, which is an effort to apply language (not just API) syntax to the problem, and offer the "flattest" possible programming style to solve the problem (because you get back to just one code block per functional unit). Stream-processing is a different domain, and Java+KStreams is nowhere near as callback heavy as JS, so I don't think we have to take the JS story for granted, but then again, I think we can derive some valuable lessons by looking sideways to adjacent domains. I'm just bringing this up to inspire further/deeper discussion. At the same time, just like JS, we can afford to take an iterative approach to the problem. Separately again, I'm interested in the post-branch merge (and I'd also add join) problem that Paul brought up. We can clearly punt on it, by terminating the nested branches with sink operators. But is there a DSL way to do it? Thanks again for your driving this, -John On Thu, May 2, 2019 at 7:39 PM Paul Whalen mailto:pgwha...@gmail.com>> wrote: Ivan, I’ll definitely forfeit my point on the clumsiness of the branch(predicate, consumer) solution, I don’t see any real drawbacks for the dynamic case. IMO the one trade off to consider at this point is the scope question. I don’t know if I totally agree that “we rarely need them in the same scope” since merging the branches back together later seems like a perfectly plausible use case that can be a lot nicer when the branched streams are in the same scope. That being said, for the reasons Ivan listed, I think it is overall the better solution - working around the scope thing is easy enough if you need to. > On May 2, 2019, at 7:00 PM, Ivan Ponomarev wrote: > > Hello everyone, thank you all for joining the discussion! > > Well, I don't think the idea of named branches, be it a LinkedHashMap (no other Map will do, because order of definition matters) or `branch` method taking name and Consumer has more advantages than drawbacks. > > In my opinion, the only real positive outcome from Michael's proposal is that all the returned branches are in the same scope. But 1) we rarely need them in the same scope 2) there is a workaround for the scope problem, described in the KIP. > > 'Inlining the complex logic' is not a problem, because we can use method references instead of lambdas. In real world scenarios you tend to split the complex logic to methods anyway, so the code is going to be clean. > > The drawbacks are strong. The cohesion between predicates and handlers is lost. We have to define predicates in one place, and handlers in another. This
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < matth...@confluent.io wrote: Thanks for updating the KIP and your answers. this is to make the name similar to String#split that also returns an array, right? The intend was to avoid name duplication. The return type should _not_ be an array. The current proposal is stream.branch() .branch(Predicate) .branch(Predicate) .defaultBranch(); IMHO, this reads a little odd, because the first `branch()` does not take any parameters and has different semantics than the later `branch()` calls. Note, that from the code snippet above, it's hidden that the first call is `KStream#branch()` while the others are `KBranchedStream#branch()` what makes reading the code harder. Because I suggested to rename `addBranch()` -> `branch()`, I though it might be better to also rename `KStream#branch()` to avoid the naming overlap that seems to be confusing. The following reads much cleaner to me: stream.split() .branch(Predicate) .branch(Predicate) .defaultBranch(); Maybe there is a better alternative to `split()` though to avoid the naming overlap. 'default' is, however, a reserved word, so unfortunately we cannot have a method with such name :-) Bummer. Didn't consider this. Maybe we can still come up with a short name? Can you add the interface `KBranchedStream` to the KIP with all it's methods? It will be part of public API and should be contained in the KIP. For example, it's unclear atm, what the return type of `defaultBranch()` is. You did not comment on the idea to add a `KBranchedStream#get(int index) -> KStream` method to get the individually branched-KStreams. Would be nice to get your feedback about it. It seems you suggest that users would need to write custom utility code otherwise, to access them. We should discuss the pros and cons of both approaches. It feels "incomplete" to me atm, if the API has no built-in support to get the branched-KStreams directly. -Matthias On 4/13/19 2:13 AM, Ivan Ponomarev wrote: Hi all! I have updated the KIP-418 according to the new vision. Matthias, thanks for your comment! Renaming KStream#branch() -> #split() I can see your point: this is to make the name similar to String#split that also returns an array, right? But is it worth the loss of backwards compatibility? We can have overloaded branch() as well without affecting the existing code. Maybe the old array-based `branch` method should be deprecated, but this is a subject for discussion. Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), KBranchedStream#defaultBranch() -> BranchingKStream#default() Totally agree with 'addBranch->branch' rename. 'default' is, however, a reserved word, so unfortunately we cannot have a method with such name :-) defaultBranch() does take an `Predicate` as argument, but I think that is not required? Absolutely! I think that was just copy-paste error or something. Dear colleagues, please revise the new version of the KIP and Paul's PR (https://github.com/apache/kafka/pull/6512) Any new suggestions/objections? Regards, Ivan 11.04.2019 11:47, Matthias J. Sax пишет: Thanks for driving the discussion of this KIP. It seems that everybody agrees that the current branch() method using arrays is not optimal. I had a quick look into the PR and I like the overall proposal. There are some minor things we need to consider. I would recommend the following renaming: KStream#branch() -> #split() KBranchedStream#addBranch() -> BranchingKStream#branch() KBranchedStream#defaultBranch() -> BranchingKStream#default() It's just a suggestion to get slightly shorter method names. In the current PR, defaultBranch() does take an `Predicate` as argument, but I think that is not required? Also, we should consider KIP-307, that was recently accepted and is currently implemented: https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL Ie, we should add overloads that accepted a `Named` parameter. For the issue that the created `KStream` object are in different scopes: could we extend `KBranchedStream` with a `get(int index)` method that returns the corresponding "branched" result `KStream` object? Maybe, the second argument of `addBranch()` should not be a `Consumer` but a `Function` and `get()` could return whatever the `Function` returns? Finally, I would also suggest to update the KIP with the current proposal. That makes it easier to review. -Matthias On 3/31/19 12:22 PM, Paul Whalen wrote: Ivan, I'm a bit of a novice here as well, but I think it makes sense for you to revise the KIP and continue the discussion. Obviously we'll need some buy-in from committers that have actual binding votes on whether the KIP could be adopted. It would be great to hear if they think this
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hi all! I have updated the KIP-418 according to the new vision. Matthias, thanks for your comment! Renaming KStream#branch() -> #split() I can see your point: this is to make the name similar to String#split that also returns an array, right? But is it worth the loss of backwards compatibility? We can have overloaded branch() as well without affecting the existing code. Maybe the old array-based `branch` method should be deprecated, but this is a subject for discussion. > Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), KBranchedStream#defaultBranch() -> BranchingKStream#default() Totally agree with 'addBranch->branch' rename. 'default' is, however, a reserved word, so unfortunately we cannot have a method with such name :-) > defaultBranch() does take an `Predicate` as argument, but I think that is not required? Absolutely! I think that was just copy-paste error or something. Dear colleagues, please revise the new version of the KIP and Paul's PR (https://github.com/apache/kafka/pull/6512) Any new suggestions/objections? Regards, Ivan 11.04.2019 11:47, Matthias J. Sax пишет: Thanks for driving the discussion of this KIP. It seems that everybody agrees that the current branch() method using arrays is not optimal. I had a quick look into the PR and I like the overall proposal. There are some minor things we need to consider. I would recommend the following renaming: KStream#branch() -> #split() KBranchedStream#addBranch() -> BranchingKStream#branch() KBranchedStream#defaultBranch() -> BranchingKStream#default() It's just a suggestion to get slightly shorter method names. In the current PR, defaultBranch() does take an `Predicate` as argument, but I think that is not required? Also, we should consider KIP-307, that was recently accepted and is currently implemented: https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL Ie, we should add overloads that accepted a `Named` parameter. For the issue that the created `KStream` object are in different scopes: could we extend `KBranchedStream` with a `get(int index)` method that returns the corresponding "branched" result `KStream` object? Maybe, the second argument of `addBranch()` should not be a `Consumer` but a `Function` and `get()` could return whatever the `Function` returns? Finally, I would also suggest to update the KIP with the current proposal. That makes it easier to review. -Matthias On 3/31/19 12:22 PM, Paul Whalen wrote: Ivan, I'm a bit of a novice here as well, but I think it makes sense for you to revise the KIP and continue the discussion. Obviously we'll need some buy-in from committers that have actual binding votes on whether the KIP could be adopted. It would be great to hear if they think this is a good idea overall. I'm not sure if that happens just by starting a vote, or if there is generally some indication of interest beforehand. That being said, I'll continue the discussion a bit: assuming we do move forward the solution of "stream.branch() returns KBranchedStream", do we deprecate "stream.branch(...) returns KStream[]"? I would favor deprecating, since having two mutually exclusive APIs that accomplish the same thing is confusing, especially when they're fairly similar anyway. We just need to be sure we're not making something impossible/difficult that is currently possible/easy. Regarding my PR - I think the general structure would work, it's just a little sloppy overall in terms of naming and clarity. In particular, passing in the "predicates" and "children" lists which get modified in KBranchedStream but read from all the way KStreamLazyBranch is a bit complicated to follow. Thanks, Paul On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev wrote: Hi Paul! I read your code carefully and now I am fully convinced: your proposal looks better and should work. We just have to document the crucial fact that KStream consumers are invoked as they're added. And then it's all going to be very nice. What shall we do now? I should re-write the KIP and resume the discussion here, right? Why are you telling that your PR 'should not be even a starting point if we go in this direction'? To me it looks like a good starting point. But as a novice in this project I might miss some important details. Regards, Ivan 28.03.2019 17:38, Paul Whalen пишет: Ivan, Maybe I’m missing the point, but I believe the stream.branch() solution supports this. The couponIssuer::set* consumers will be invoked as they’re added, not during streamsBuilder.build(). So the user still ought to be able to call couponIssuer.coupons() afterward and depend on the branched streams having been set. The issue I mean to point out is that it is hard to access the branched streams in the same scope as the original stream (that is, not inside the couponIssuer), wh
Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements
Hi all! I was about to write another KIP, but found out that KIP-401 addresses exactly the problem I faced. So let me jump into your discussion and ask you to assess another idea. I fully agree with the KIP-401's motivation part. E. g in my project I had to invent a wrapper class that hides the details of KeyValueStore management from business logic. Of course this should be done better in KStreams API. But I was about to look at this problem from another side and propose a simple alternative in high-level DSL, that will not fit all the cases, but most of them. Hence my idea does not exclude the Paul's proposal. What if we restrict ourselves to *only one* KeyValueStore and propose a method that resembles `aggregate` and `reduce` methods, like this: stream .map(...) .filter(...) .transform ((k, v, s)->{}, Transformed.with()) where * k, v -- input key & value * s -- a KeyValueStore provided as an argument * return value of the lambda should be KeyValue.pair(...) * Transformed.with... is a builder, used in order to define the Transformer and KeyValueStore building parameters. Some of these parameters should be: ** store's KeySerde, ** store's ValueSerde, ** whether the store is persistent or in-memory, ** store's name -- optional parameter, the system should be able to devise the name of the store transparently for the user, if we don't want to devise it ourselves/share the store between processors. ** scheduled punctuation. Imagine we have a KStream, and we need to calculate a `derivative` stream, that is, a stream of 'deltas' of the provided integer values. This could be achieved as simple as stream.transform((key, value, stateStore) -> { int previousValue = Optional.ofNullable(stateStore.get(key)).orElse(0); stateStore.put(key, value); return KeyValue.pair(key, value - previousValue); } //we do not need to bother with store name, punctuation etc. //may be even Serde part can be omitted, since we can inherit the serdes from stream by default , Transformed.with(Serdes.String(), Serdes.Integer()) } The hard part of it is that new `transform` method definition should be parameterized by six type parameters: * input/output/KeyValueStore key type, * input/output/KeyValueStore value type. However, it seems that all these types can be inferred from the provided lambda and Transformed.with instances. What do you think about this? Regards, Ivan 27.03.2019 20:45, Guozhang Wang пишет: Hello Paul, Thanks for the uploaded PR and the detailed description! I've made a pass on it and left some comments. Overall I think I agree with you that passing in the storebuilder directly that store name is more convienent as it does not require another `addStore` call, but we just need to spend some more documentation effort on educating users about the two ways of connecting their stores. I'm slightly concerned about this education curve but I can be convinced if most people felt it is worthy. Guozhang On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen wrote: I'd like to resurrect this discussion with a cursory, proof-of-concept implementation of the KIP which combines many of our ideas: https://github.com/apache/kafka/pull/6496. I tried to keep the diff as small as possible for now, just using it to convey the main ideas. But I'll separately address some of our earlier discussion: - Will there be a new, separate interface for users to implement for the new functionality? No, to hopefully keep things simple, all of the Processor/TransformerSupplier interfaces will just extend StateStoresSupplier, allowing users to opt in to this functionality by overriding the default implementation that gives an empty list. - Will the interface allow users to specify the store name, or the entire StoreBuilder? The entire StoreBuilder, so the Processor/TransformerSupplier can completely encapsulate name and implementation of a state store if desired. - Will the old way of specifying store names alongside the supplier when calling stream.process/transform() be deprecated? No, this is still a legitimate way to wire up Processors/Transformers and their stores. But I would recommend not allowing stream.process/transform() calls that use both store declaration mechanisms (this restriction is not in the proof of concept) - How will we handle adding the same state store to the topology multiple times because different Processor/TransformerSuppliers declare it? topology.addStateStore() will be slightly relaxed for convenience, and will allow adding the same StoreBuilder multiple times as long as the exact same StoreBuilder instance is being added for the same store name. This seems to prevent in practice the issue of accidentally making two state stores one by adding with the same name. For additional safety, if we wanted to (not in the
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hi Paul! I read your code carefully and now I am fully convinced: your proposal looks better and should work. We just have to document the crucial fact that KStream consumers are invoked as they're added. And then it's all going to be very nice. What shall we do now? I should re-write the KIP and resume the discussion here, right? Why are you telling that your PR 'should not be even a starting point if we go in this direction'? To me it looks like a good starting point. But as a novice in this project I might miss some important details. Regards, Ivan 28.03.2019 17:38, Paul Whalen пишет: Ivan, Maybe I’m missing the point, but I believe the stream.branch() solution supports this. The couponIssuer::set* consumers will be invoked as they’re added, not during streamsBuilder.build(). So the user still ought to be able to call couponIssuer.coupons() afterward and depend on the branched streams having been set. The issue I mean to point out is that it is hard to access the branched streams in the same scope as the original stream (that is, not inside the couponIssuer), which is a problem with both proposed solutions. It can be worked around though. [Also, great to hear additional interest in 401, I’m excited to hear your thoughts!] Paul On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev wrote: Hi Paul! The idea to postpone the wiring of branches to the streamsBuilder.build() also looked great for me at first glance, but --- the newly branched streams are not available in the same scope as each other. That is, if we wanted to merge them back together again I don't see a way to do that. You just took the words right out of my mouth, I was just going to write in details about this issue. Consider the example from Bill's book, p. 101: say we need to identify customers who have bought coffee and made a purchase in the electronics store to give them coupons. This is the code I usually write under these circumstances using my 'brancher' class: @Setter class CouponIssuer{ private KStream<> coffePurchases; private KStream<> electronicsPurchases; KStream<...> coupons(){ return coffePurchases.join(electronicsPurchases...)...whatever /*In the real world the code here can be complex, so creation of a separate CouponIssuer class is fully justified, in order to separate classes' responsibilities.*/ } } CouponIssuer couponIssuer = new CouponIssuer(); new KafkaStreamsBrancher<>() .branch(predicate1, couponIssuer::setCoffePurchases) .branch(predicate2, couponIssuer::setElectronicsPurchases) .onTopOf(transactionStream); /*Alas, this won't work if we're going to wire up everything later, without the terminal operation!!!*/ couponIssuer.coupons()... Does this make sense? In order to properly initialize the CouponIssuer we need the terminal operation to be called before streamsBuilder.build() is called. [BTW Paul, I just found out that your KIP-401 is essentially the next KIP I was going to write here. I have some thoughts based on my experience, so I will join the discussion on KIP-401 soon.] Regards, Ivan 28.03.2019 6:29, Paul Whalen пишет: Ivan, I tried to make a very rough proof of concept of a fluent API based off of KStream here (https://github.com/apache/kafka/pull/6512), and I think I succeeded at removing both cons. - Compatibility: I was incorrect earlier about compatibility issues, there aren't any direct ones. I was unaware that Java is smart enough to distinguish between a branch(varargs...) returning one thing and branch() with no arguments returning another thing. - Requiring a terminal method: We don't actually need it. We can just build up the branches in the KBranchedStream who shares its state with the ProcessorSupplier that will actually do the branching. It's not terribly pretty in its current form, but I think it demonstrates its feasibility. To be clear, I don't think that pull request should be final or even a starting point if we go in this direction, I just wanted to see how challenging it would be to get the API working. I will say though, that I'm not sure the existing solution could be deprecated in favor of this, which I had originally suggested was a possibility. The reason is that the newly branched streams are not available in the same scope as each other. That is, if we wanted to merge them back together again I don't see a way to do that. The KIP proposal has the same issue, though - all this means is that for either solution, deprecating the existing branch(...) is not on the table. Thanks, Paul On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev wrote: OK, let me summarize what we have discussed up to this point. First, it seems that it's commonly agreed that branch API needs improvement. Motivation is given in the KIP. There are two potential ways to do it: 1. (as origianlly proposed) new KafkaStreamsBrancher<..>
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hi Paul! The idea to postpone the wiring of branches to the streamsBuilder.build() also looked great for me at first glance, but --- > the newly branched streams are not available in the same scope as each other. That is, if we wanted to merge them back together again I don't see a way to do that. You just took the words right out of my mouth, I was just going to write in details about this issue. Consider the example from Bill's book, p. 101: say we need to identify customers who have bought coffee and made a purchase in the electronics store to give them coupons. This is the code I usually write under these circumstances using my 'brancher' class: @Setter class CouponIssuer{ private KStream<> coffePurchases; private KStream<> electronicsPurchases; KStream<...> coupons(){ return coffePurchases.join(electronicsPurchases...)...whatever /*In the real world the code here can be complex, so creation of a separate CouponIssuer class is fully justified, in order to separate classes' responsibilities.*/ } } CouponIssuer couponIssuer = new CouponIssuer(); new KafkaStreamsBrancher<>() .branch(predicate1, couponIssuer::setCoffePurchases) .branch(predicate2, couponIssuer::setElectronicsPurchases) .onTopOf(transactionStream); /*Alas, this won't work if we're going to wire up everything later, without the terminal operation!!!*/ couponIssuer.coupons()... Does this make sense? In order to properly initialize the CouponIssuer we need the terminal operation to be called before streamsBuilder.build() is called. [BTW Paul, I just found out that your KIP-401 is essentially the next KIP I was going to write here. I have some thoughts based on my experience, so I will join the discussion on KIP-401 soon.] Regards, Ivan 28.03.2019 6:29, Paul Whalen пишет: Ivan, I tried to make a very rough proof of concept of a fluent API based off of KStream here (https://github.com/apache/kafka/pull/6512), and I think I succeeded at removing both cons. - Compatibility: I was incorrect earlier about compatibility issues, there aren't any direct ones. I was unaware that Java is smart enough to distinguish between a branch(varargs...) returning one thing and branch() with no arguments returning another thing. - Requiring a terminal method: We don't actually need it. We can just build up the branches in the KBranchedStream who shares its state with the ProcessorSupplier that will actually do the branching. It's not terribly pretty in its current form, but I think it demonstrates its feasibility. To be clear, I don't think that pull request should be final or even a starting point if we go in this direction, I just wanted to see how challenging it would be to get the API working. I will say though, that I'm not sure the existing solution could be deprecated in favor of this, which I had originally suggested was a possibility. The reason is that the newly branched streams are not available in the same scope as each other. That is, if we wanted to merge them back together again I don't see a way to do that. The KIP proposal has the same issue, though - all this means is that for either solution, deprecating the existing branch(...) is not on the table. Thanks, Paul On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev wrote: OK, let me summarize what we have discussed up to this point. First, it seems that it's commonly agreed that branch API needs improvement. Motivation is given in the KIP. There are two potential ways to do it: 1. (as origianlly proposed) new KafkaStreamsBrancher<..>() .branch(predicate1, ks ->..) .branch(predicate2, ks->..) .defaultBranch(ks->..) //optional .onTopOf(stream).mapValues(...) //onTopOf returns its argument PROS: 1) Fully backwards compatible. 2) The code won't make sense until all the necessary ingredients are provided. CONS: The need to create a KafkaStreamsBrancher instance contrasts the fluency of other KStream methods. 2. (as Paul proposes) stream .branch(predicate1, ks ->...) .branch(predicate2, ks->...) .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and noDefault() return void PROS: Generally follows the way KStreams interface is defined. CONS: We need to define two terminal methods (defaultBranch(ks->) and noDefault()). And for a user it is very easy to miss the fact that one of the terminal methods should be called. If these methods are not called, we can throw an exception in runtime. Colleagues, what are your thoughts? Can we do better? Regards, Ivan 27.03.2019 18:46, Ivan Ponomarev пишет: 25.03.2019 17:43, Ivan Ponomarev пишет: Paul, I see your point when you are talking about stream..branch..branch...default.. Still, I believe that this cannot not be implemented the easy way. Maybe we all should think further. Let me comment on two of your idea
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
OK, let me summarize what we have discussed up to this point. First, it seems that it's commonly agreed that branch API needs improvement. Motivation is given in the KIP. There are two potential ways to do it: 1. (as origianlly proposed) new KafkaStreamsBrancher<..>() .branch(predicate1, ks ->..) .branch(predicate2, ks->..) .defaultBranch(ks->..) //optional .onTopOf(stream).mapValues(...) //onTopOf returns its argument PROS: 1) Fully backwards compatible. 2) The code won't make sense until all the necessary ingredients are provided. CONS: The need to create a KafkaStreamsBrancher instance contrasts the fluency of other KStream methods. 2. (as Paul proposes) stream .branch(predicate1, ks ->...) .branch(predicate2, ks->...) .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and noDefault() return void PROS: Generally follows the way KStreams interface is defined. CONS: We need to define two terminal methods (defaultBranch(ks->) and noDefault()). And for a user it is very easy to miss the fact that one of the terminal methods should be called. If these methods are not called, we can throw an exception in runtime. Colleagues, what are your thoughts? Can we do better? Regards, Ivan 27.03.2019 18:46, Ivan Ponomarev пишет: 25.03.2019 17:43, Ivan Ponomarev пишет: Paul, I see your point when you are talking about stream..branch..branch...default.. Still, I believe that this cannot not be implemented the easy way. Maybe we all should think further. Let me comment on two of your ideas. user could specify a terminal method that assumes nothing will reach the default branch, throwing an exception if such a case occurs. 1) OK, apparently this should not be the only option besides `default`, because there are scenarios when we want to just silently drop the messages that didn't match any predicate. 2) Throwing an exception in the middle of data flow processing looks like a bad idea. In stream processing paradigm, I would prefer to emit a special message to a dedicated stream. This is exactly where `default` can be used. it would be fairly easily for the InternalTopologyBuilder to track dangling branches that haven't been terminated and raise a clear error before it becomes an issue. You mean a runtime exception, when the program is compiled and run? Well, I'd prefer an API that simply won't compile if used incorrectly. Can we build such an API as a method chain starting from KStream object? There is a huge cost difference between runtime and compile-time errors. Even if a failure uncovers instantly on unit tests, it costs more for the project than a compilation failure. Regards, Ivan 25.03.2019 0:38, Paul Whalen пишет: Ivan, Good point about the terminal operation being required. But is that really such a bad thing? If the user doesn't want a defaultBranch they can call some other terminal method (noDefaultBranch()?) just as easily. In fact I think it creates an opportunity for a nicer API - a user could specify a terminal method that assumes nothing will reach the default branch, throwing an exception if such a case occurs. That seems like an improvement over the current branch() API, which allows for the more subtle behavior of records unexpectedly getting dropped. The need for a terminal operation certainly has to be well documented, but it would be fairly easily for the InternalTopologyBuilder to track dangling branches that haven't been terminated and raise a clear error before it becomes an issue. Especially now that there is a "build step" where the topology is actually wired up, when StreamsBuilder.build() is called. Regarding onTopOf() returning its argument, I agree that it's critical to allow users to do other operations on the input stream. With the fluent solution, it ought to work the same way all other operations do - if you want to process off the original KStream multiple times, you just need the stream as a variable so you can call as many operations on it as you desire. Thoughts? Best, Paul On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev wrote: Hello Paul, I afraid this won't work because we do not always need the defaultBranch. And without a terminal operation we don't know when to finalize and build the 'branch switch'. In my proposal, onTopOf returns its argument, so we can do something more with the original branch after branching. I understand your point that the need of special object construction contrasts the fluency of most KStream methods. But here we have a special case: we build the switch to split the flow, so I think this is still idiomatic. Regards, Ivan 24.03.2019 4:02, Paul Whalen пишет: Ivan, I think it's a great idea to improve this API, but I find the onTopOff() mechanism a little confusing since it contrasts the fluency of other KStream method calls. Ideally I'd like to just call a method on the s
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Paul, I see your point when you are talking about stream..branch..branch...default.. Still, I believe that this cannot not be implemented the easy way. Maybe we all should think further. Let me comment on two of your ideas. user could specify a terminal method that assumes nothing will reach the default branch, throwing an exception if such a case occurs. 1) OK, apparently this should not be the only option besides `default`, because there are scenarios when we want to just silently drop the messages that didn't match any predicate. 2) Throwing an exception in the middle of data flow processing looks like a bad idea. In stream processing paradigm, I would prefer to emit a special message to a dedicated stream. This is exactly where `default` can be used. it would be fairly easily for the InternalTopologyBuilder to track dangling branches that haven't been terminated and raise a clear error before it becomes an issue. You mean a runtime exception, when the program is compiled and run? Well, I'd prefer an API that simply won't compile if used incorrectly. Can we build such an API as a method chain starting from KStream object? There is a huge cost difference between runtime and compile-time errors. Even if a failure uncovers instantly on unit tests, it costs more for the project than a compilation failure. Regards, Ivan 25.03.2019 0:38, Paul Whalen пишет: Ivan, Good point about the terminal operation being required. But is that really such a bad thing? If the user doesn't want a defaultBranch they can call some other terminal method (noDefaultBranch()?) just as easily. In fact I think it creates an opportunity for a nicer API - a user could specify a terminal method that assumes nothing will reach the default branch, throwing an exception if such a case occurs. That seems like an improvement over the current branch() API, which allows for the more subtle behavior of records unexpectedly getting dropped. The need for a terminal operation certainly has to be well documented, but it would be fairly easily for the InternalTopologyBuilder to track dangling branches that haven't been terminated and raise a clear error before it becomes an issue. Especially now that there is a "build step" where the topology is actually wired up, when StreamsBuilder.build() is called. Regarding onTopOf() returning its argument, I agree that it's critical to allow users to do other operations on the input stream. With the fluent solution, it ought to work the same way all other operations do - if you want to process off the original KStream multiple times, you just need the stream as a variable so you can call as many operations on it as you desire. Thoughts? Best, Paul On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev wrote: Hello Paul, I afraid this won't work because we do not always need the defaultBranch. And without a terminal operation we don't know when to finalize and build the 'branch switch'. In my proposal, onTopOf returns its argument, so we can do something more with the original branch after branching. I understand your point that the need of special object construction contrasts the fluency of most KStream methods. But here we have a special case: we build the switch to split the flow, so I think this is still idiomatic. Regards, Ivan 24.03.2019 4:02, Paul Whalen пишет: Ivan, I think it's a great idea to improve this API, but I find the onTopOff() mechanism a little confusing since it contrasts the fluency of other KStream method calls. Ideally I'd like to just call a method on the stream so it still reads top to bottom if the branch cases are defined fluently. I think the addBranch(predicate, handleCase) is very nice and the right way to do things, but what if we flipped around how we specify the source stream. Like: stream.branch() .addBranch(predicate1, this::handle1) .addBranch(predicate2, this::handle2) .defaultBranch(this::handleDefault); Where branch() returns a KBranchedStreams or KStreamBrancher or something, which is added to by addBranch() and terminated by defaultBranch() (which returns void). This is obviously incompatible with the current API, so the new stream.branch() would have to have a different name, but that seems like a fairly small problem - we could call it something like branched() or branchedStreams() and deprecate the old API. Does this satisfy the motivations of your KIP? It seems like it does to me, allowing for clear in-line branching while also allowing you to dynamically build of branches off of KBranchedStreams if desired. Thanks, Paul On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev wrote: Hi Bill, Thank you for your reply! This is how I usually do it: void handleFirstCase(KStream ks){ ks.filter().mapValues(...) } void handleSecondCase(KStream ks){ ks.selectKey(...).groupByKey()... } .. new KafkaStreamsBrancher() .addBranch(
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hello Paul, I afraid this won't work because we do not always need the defaultBranch. And without a terminal operation we don't know when to finalize and build the 'branch switch'. In my proposal, onTopOf returns its argument, so we can do something more with the original branch after branching. I understand your point that the need of special object construction contrasts the fluency of most KStream methods. But here we have a special case: we build the switch to split the flow, so I think this is still idiomatic. Regards, Ivan 24.03.2019 4:02, Paul Whalen пишет: Ivan, I think it's a great idea to improve this API, but I find the onTopOff() mechanism a little confusing since it contrasts the fluency of other KStream method calls. Ideally I'd like to just call a method on the stream so it still reads top to bottom if the branch cases are defined fluently. I think the addBranch(predicate, handleCase) is very nice and the right way to do things, but what if we flipped around how we specify the source stream. Like: stream.branch() .addBranch(predicate1, this::handle1) .addBranch(predicate2, this::handle2) .defaultBranch(this::handleDefault); Where branch() returns a KBranchedStreams or KStreamBrancher or something, which is added to by addBranch() and terminated by defaultBranch() (which returns void). This is obviously incompatible with the current API, so the new stream.branch() would have to have a different name, but that seems like a fairly small problem - we could call it something like branched() or branchedStreams() and deprecate the old API. Does this satisfy the motivations of your KIP? It seems like it does to me, allowing for clear in-line branching while also allowing you to dynamically build of branches off of KBranchedStreams if desired. Thanks, Paul On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev wrote: Hi Bill, Thank you for your reply! This is how I usually do it: void handleFirstCase(KStream ks){ ks.filter().mapValues(...) } void handleSecondCase(KStream ks){ ks.selectKey(...).groupByKey()... } .. new KafkaStreamsBrancher() .addBranch(predicate1, this::handleFirstCase) .addBranch(predicate2, this::handleSecondCase) .onTopOf() Regards, Ivan 22.03.2019 1:34, Bill Bejeck пишет: Hi Ivan, Thanks for the KIP. I have one question, the KafkaStreamsBrancher takes a Consumer as a second argument which returns nothing, and the example in the KIP shows each stream from the branch using a terminal node (KafkaStreams#to() in this case). Maybe I've missed something, but how would we handle the case where the user has created a branch but wants to continue processing and not necessarily use a terminal node on the branched stream immediately? For example, using today's logic as is if we had something like this: KStream[] branches = originalStream.branch(predicate1, predicate2); branches[0].filter().mapValues(...).. branches[1].selectKey(...).groupByKey(). Thanks! Bill On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck wrote: All, I'd like to jump-start the discussion for KIP- 418. Here's the original message: Hello, I'd like to start a discussion about KIP-418. Please take a look at the KIP if you can, I would appreciate any feedback :) KIP-418: https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488 PR#6164: https://github.com/apache/kafka/pull/6164 Regards, Ivan Ponomarev
Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hi Bill, Thank you for your reply! This is how I usually do it: void handleFirstCase(KStream ks){ ks.filter().mapValues(...) } void handleSecondCase(KStream ks){ ks.selectKey(...).groupByKey()... } .. new KafkaStreamsBrancher() .addBranch(predicate1, this::handleFirstCase) .addBranch(predicate2, this::handleSecondCase) .onTopOf() Regards, Ivan 22.03.2019 1:34, Bill Bejeck пишет: Hi Ivan, Thanks for the KIP. I have one question, the KafkaStreamsBrancher takes a Consumer as a second argument which returns nothing, and the example in the KIP shows each stream from the branch using a terminal node (KafkaStreams#to() in this case). Maybe I've missed something, but how would we handle the case where the user has created a branch but wants to continue processing and not necessarily use a terminal node on the branched stream immediately? For example, using today's logic as is if we had something like this: KStream[] branches = originalStream.branch(predicate1, predicate2); branches[0].filter().mapValues(...).. branches[1].selectKey(...).groupByKey(). Thanks! Bill On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck wrote: All, I'd like to jump-start the discussion for KIP- 418. Here's the original message: Hello, I'd like to start a discussion about KIP-418. Please take a look at the KIP if you can, I would appreciate any feedback :) KIP-418: https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488 PR#6164: https://github.com/apache/kafka/pull/6164 Regards, Ivan Ponomarev
Re: [ANNOUNCE] New Committer: Bill Bejeck
Congratulations, Bill! Your 'Kafka Streams in Action' is a great book. These months it is always travelling with me in my backpack with my laptop )) Regards, Ivan 14.02.2019 3:56, Guozhang Wang пишет: Hello all, The PMC of Apache Kafka is happy to announce that we've added Bill Bejeck as our newest project committer. Bill has been active in the Kafka community since 2015. He has made significant contributions to the Kafka Streams project with more than 100 PRs and 4 authored KIPs, including the streams topology optimization framework. Bill's also very keen on tightening Kafka's unit test / system tests coverage, which is a great value to our project codebase. In addition, Bill has been very active in evangelizing Kafka for stream processing in the community. He has given several Kafka meetup talks in the past year, including a presentation at Kafka Summit SF. He's also authored a book about Kafka Streams ( https://www.manning.com/books/kafka-streams-in-action), as well as various of posts in public venues like DZone as well as his personal blog ( http://codingjunkie.net/). We really appreciate the contributions and are looking forward to see more from him. Congratulations, Bill ! Guozhang, on behalf of the Apache Kafka PMC
[DISCUSSION] KIP-418: A method-chaining way to branch KStream
Hello, I'd like to start a discussion about KIP-418. Please take a look at the KIP if you can, I would appreciate any feedback :) KIP-418: https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488 PR#6164: https://github.com/apache/kafka/pull/6164 Regards, Ivan Ponomarev