[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725357#comment-17725357 ] Matthias J. Sax commented on KAFKA-7497: Seems to be fixed. Cf https://issues.apache.org/jira/browse/KAFKA-14209 > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17552538#comment-17552538 ] Rohit Verma commented on KAFKA-7497: In fact, our use case is also for multiple events on the same topic [https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/.|https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/] As suggested in the example if I push Customer and Order events in the same topic/partition I want to do a self join to get customerOrders too. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17396307#comment-17396307 ] Matthias J. Sax commented on KAFKA-7497: Given my above comment: {quote}Also, and this seems to be the most severs issue, each record would join with itself, what is actually not desired... {quote} I think this is actually not correct... At least if we consider self-joins in standard SQL, a record would join with itself. We should follow the same semantics, and thus, it's possible (even not efficient) today with Kafka Stream to do a self-join, > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743468#comment-16743468 ] Guozhang Wang commented on KAFKA-7497: -- >From the expressiveness of the operators, I think there are cases of stream >self-join that cannot be captured with stream aggregations still, since the >window is really "sliding" (but if we add a sliding window type aggregations, >it may equal to the semantics of streams self-join). >From the API point of view, I think allowing stream self join even assuming >its use cases can be captured with sliding window aggregations still provides >programmability benefits. But the underlying implementation should be >different to any of our current internal impls. I think we can still have an >umbrella KIP that includes the following: 1. Add sliding window based aggregations. 2. Allow windowed stream self-join; and when detected it convert it to a sliding window based aggregation behind the scene for efficient implementations. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743329#comment-16743329 ] John Roesler commented on KAFKA-7497: - Ah. I was looking at the javadoc on `KStream#join`. My bad. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743249#comment-16743249 ] Matthias J. Sax commented on KAFKA-7497: You understanding seems correct. Also not the JavaDocs from `JoinWindows` {quote}{{* In SQL-style you would express this join as}} {{* }} {{* SELECT * FROM stream1, stream2}} {{* WHERE}} {{* stream1.key = stream2.key}} {{* AND}} {{* stream1.ts - before <= stream2.ts AND stream2.ts <= stream1.ts + after}} {quote} I agree, it's not necessarily a public API change. However, it's might still be a major change that we might want to back up with a KIP. Not sure. In the end, it's an optimization to void two state stores, because one state store should be sufficient to compute the self-join. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16743225#comment-16743225 ] John Roesler commented on KAFKA-7497: - Thanks [~mjsax], I see that the "key" field in Kafka can be set to anything. My question was about the semantics of a stream-stream join. I've read our javadoc, and all it says is that it does an "inner equi-join" restricted by the time window. I guess this means that, given two streams `U=` and `V=`, it produces at least one result pair `(ui, vj)` for each pair in the cartesian product of the streams such that `ui.key == vj.key` and `abs(ui.time - vj.time) <= window_size`. Under this definition, if we happen to set V := U, then the operation is still well defined. It sounds like this is the precise ask, since at the moment, choosing `V := U` throws a runtime error, even though it's not semantically prohibited. It does seem like part of the scope of work should be to implement it efficiently, that is, to detect that both streams are actually the same at topology-build-time and ensure that we only need one join window store. If I understand this scoping correctly, there's no public API change, just a behavior change. Also, since it's currently not possible to start a topology with a stream self-join, there's no deprecation or migration plan needed. Therefore no KIP is required. Sound good? > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742733#comment-16742733 ] Matthias J. Sax commented on KAFKA-7497: I disagree "that you have a stream of unique events" – the join condition is defined on the record key but the record key is not a primary key for streams: for example, you can have a stream of clicks using the page-id as key. Also note, that each record might join multiple times, not just once. {quote}one side of a pair may be arbitrarily delayed or disordered, which leads to the need for memory on one or both sides of the join.{quote} Not sure what you mean by this. If you refer to the join window, I think this is two different thing. "Delay" or "disorder" seem to refer to wall-clock time, but the join is defined on event-time. Thus, the semantics is to join events that happen temporarily close to each other. This can be translated to the self-join case too: consider the clickstream example with page-id as key, it mean to return all pages, for which there is more than one click within the time window. I don't think this is related to similarity joins at all. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16742729#comment-16742729 ] John Roesler commented on KAFKA-7497: - Note, the use-case reference above is now at: [https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection-README.adoc] I _think_ I see the rationale of this ask: I guess the difference between a windowed aggregation and a self-join is that the windowed aggregation would require you to save all the occurrences of the key in the window and do your computation (which is a pairwise computation) over the collection, whereas the self-join naturally gives you a stream of all the pairwise matches when the same key re-occurs within the window in the stream. IIUC, you can do the same computation either way, but it's more naturally expressed as a pairwise comparison, so the self-join is more ergonomic? (Although, if there are two fraudulent transactions on the same account, they would show up in the existing program as two independent potential frauds, whereas the aggregation method gives you the opportunity to generate just one fraud report with two occurrences) On the other hand, I'm struggling to see the semantics of this feature clearly. As I understand it, the semantics of a stream-stream join in general is that you have a stream of unique events U= and another stream of unique events V=, and you effectively want to "zip" them to produce J=<(u1,v1), (u2,v2), (u3,v3)>. Note that the indices are the identifying key, and they are unique. The standard stream processing complications apply, one side of a pair may be arbitrarily delayed or disordered, which leads to the need for memory on one or both sides of the join. The use case described in the document linked above seems different; the identifying information isn't unique, in that the same account id appears on an arbitrary number of events in the stream, but the account id is what we use as the joining key. It sounds similar to the "streaming similarity self-join" problem, which I found researching this issue: [http://www.vldb.org/pvldb/vol9/p792-defranciscimorales.pdf] . Under that definition, the "join" is actually just a cartesian product of every record in both streams, and for each pair, you compute a similarity, keeping (or discarding) only the pairs that have an above-threshold similarity score. There are a number of relaxations/optimizations required to do this efficiently, which in our case means limiting the product to only those records that already share the same account id, and of course limiting the join temporally. I suppose my question, after looking into this is (despite the shared name and the fact that the existing implementation happens to suit both), are these two operations really the same concept? If we say "yes", for example, then we may prohibit future optimizations. For example, in the former stream-stream join, you know that the events' keys are unique, so once you produce a pair, you can immediately forget both of the input events for it. But for the similarity-join, you have to remember the input events until the pre-defined join window closes. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648105#comment-16648105 ] Guozhang Wang commented on KAFKA-7497: -- [~mjsax] You're right, I was only thinking about the use cases that the stream needs some enrichment that are independent of any other streams / tables; I also agree with you that even with sliding window, it seems windowed aggregations should be sufficient still. [~rmoff] I looked at the use case, and it seems to be a better fit with "session window" aggregations of KStream than using KStream self-join: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/SessionWindows.html > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647671#comment-16647671 ] Robin Moffatt commented on KAFKA-7497: -- Here's the use case, inspecting a stream of transactions looking for possible fraud: https://github.com/confluentinc/demo-scene/blob/master/ksql-atm-fraud-detection/ksql-atm-fraud-detection.adoc > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647138#comment-16647138 ] Matthias J. Sax commented on KAFKA-7497: [~guozhang] I cannot follow? A self-join would still have a sliding-join-window, and thus, all records with the same key within the window would be joined. How can a stateless map achieve this? I want to add, that a sliding-window aggregation might allow to compute the same thing – note, though, that Kafka Streams only supports hopping/tumbling windows for aggregations atm, but no sliding windows. For the use case: this is also unclear to me to be honest though. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647044#comment-16647044 ] Guozhang Wang commented on KAFKA-7497: -- [~rmoff] Before we dive into the implementation details, could you list the motivations for self-joins in Streams? Note that since stream-join only support key-based join now, self-join should be well covered by just "enrich" the stream record via `map` etc. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7497) Kafka Streams should support self-join on streams
[ https://issues.apache.org/jira/browse/KAFKA-7497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646892#comment-16646892 ] Matthias J. Sax commented on KAFKA-7497: It is correct, that a topic cannot be consumed twice (cf. https://issues.apache.org/jira/browse/KAFKA-6687) and I also agree that a self-join operator would be useful. Once could express a self-joining like this: {noformat} KStream stream = builder.stream(...); stream.join(stream, ...);{noformat} However, the execution of the join would not be efficient, as two state stores with two changelog topics would be created (both containing the exact same data). Also, and this seems to be the most severs issue, each record would join with itself, what is actually not desired... I marked this as "needs-kip" but I am not 100% sure if we would need a KIP though. Maybe, Kafka Streams could detect internally, that left hand side KStream and right hand side KStream is the same object and just use a different operator implementation (ie, a dedicated self-join processor). This way, no public API change would be required. > Kafka Streams should support self-join on streams > - > > Key: KAFKA-7497 > URL: https://issues.apache.org/jira/browse/KAFKA-7497 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Robin Moffatt >Priority: Major > Labels: needs-kip > > There are valid reasons to want to join a stream to itself, but Kafka Streams > does not currently support this ({{Invalid topology: Topic foo has already > been registered by another source.}}). To perform the join requires creating > a second stream as a clone of the first, and then doing a join between the > two. This is a clunky workaround and results in unnecessary duplication of > data. -- This message was sent by Atlassian JIRA (v7.6.3#76005)