[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827015#comment-15827015 ] Paul Vaughan commented on KAFKA-3705: - Healthcare data is intrinsically relational, but the bulk of the data is related to patients in the sense that if the patient did not exist, the data would not exist. For example, these FHIR Resources all depend on a Patient: Encounter, MedicationOrder, Observation, Condition, etc., while Practitioner, Organization, etc. exist independent of patients. This suggests that a good partitioning strategy for processing this data would be to partition by patient in order to get good concurrency with minimal repartitioning/distribution. Data that is independent of the patient would likely need to be distributed to all nodes, but that data is relatively small. Processing this data includes checking for referential integrity, dealing with out-of-order data (Encounters that are received before the Patient is received), and re-keying. For example, when an Encounter arrives, a downstream version of that Encounter needs to be created with the patient’s downstream key. Similarly, when a new Patient arrives it should be given a downstream key and any Encounters that reference this patient need to be updated and sent downstream. But the data is also intrinsically keyed by something other than the patient. For example, an Encounter has a key and it is possible to get duplicate copies of an Encounter, either as corrections or simple duplicates. Thus it is desirable to use the intrinsic key with compacted topics, rather than using the patient as the Kafka topic key. While it is possible to key by one thing and partition by another using explicit partitioners, that seems both error prone and insufficient to keep the data only where it needs to be. Specifically, the High-level Streaming DSL does not seem to support the latter point. Without the foreign key support discussed here, it is necessary to do aggregation and remapping that cause implicit repartitioning. It seems determined to move the data around. It is not clear to me whether this KIP would eliminate that problem. Note that I found the documentation frustrating in that this repartitioning was not apparent from the documentation – it was most apparent by looking at the set of topics that get implicitly created. I would like to see the ability to transform a set of related incoming topics into a set of downstream topics including re-keying and sometimes renormalization using the high-level Streaming DSL. This seems like it is a start towards that, but is it sufficient? > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826346#comment-15826346 ] Jan Filipiak commented on KAFKA-3705: - AFAICS the table will be also filled by a different thread, IMO that makes it to hard to reason about the execution. If you look into the Table twice while processing a record, it might have different values. :-( GlobalKTable makes me sad > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826332#comment-15826332 ] Jan Filipiak commented on KAFKA-3705: - Hi, with regard of what I am trying here, the GlobalKTable is not usefull. It could be usefull if its source would also emmit change events. (I would still have todo the range lookup but i could save a repartition of my "bigger" table) The current design is not usuable here. :'( > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826331#comment-15826331 ] Jan Filipiak commented on KAFKA-3705: - Hi, with regard of what I am trying here, the GlobalKTable is not usefull. It could be usefull if its source would also emmit change events. (I would still have todo the range lookup but i could save a repartition of my "bigger" table) The current design is not usuable here. :'( > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15826064#comment-15826064 ] Michael Noll commented on KAFKA-3705: - [~jfilipiak]: Now that support for global KTables is around the corner (see KIP-99 at https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649), would that serve all / most of your needs here? I am aware that "non-key joining in KTable" and "global KTables" is not a full overlap, but still the overlap is quite significant. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang > Labels: api > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396508#comment-15396508 ] Guozhang Wang commented on KAFKA-3705: -- Yeah currently we do not have a windowed-stream table join, and hence the stream is not materialized but only the table. We can add this join type though in the next release if we feel it is a common request. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396498#comment-15396498 ] Jan Filipiak commented on KAFKA-3705: - The change doesn't seem to be in all the places, I thought about going Ktable,T> but also settled with Ktable the other thing would just be generic overkill even though one can hide it from the user by having a PairSerde or something. Regarding your idea, I kind of fail to see how an update to table1 or table2 would be reflected in the output with regards to republish, isn't the only option there to have the stream materialized based on a window? Need to take a deeper look though. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396491#comment-15396491 ] Guozhang Wang commented on KAFKA-3705: -- We are fixing this issue right now on the consumer layer with KIP-62: https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395478#comment-15395478 ] Jan Filipiak commented on KAFKA-3705: - Something that starts happening to us is that for low cardinality columns on the join, the prefix scan on the rocks can return a big amount of values. That leads to to much time spent between poll() and us loosing group membership. One could check the need for a poll() on the consumer while context.forward() maybe. The fix with setting session time-out very high, that we are currently using is not that good IMO > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15365365#comment-15365365 ] Guozhang Wang commented on KAFKA-3705: -- [~jfilipiak] I am convinced that this combo-key is necessary to avoid out of ordering after talking with you offline. And I have updated the design proposal wiki accordingly: https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins, feel free to take a look. Just a random thought as for your use case specifically: are relation A, B, and C all need to be captured as a KTable (i.e. the records as binlog / etc from some database table)? If the one with foreign key can be captured just a stream (i.e. KStream), then what you can do is to re-model your computation as {{(stream Join table1) Join table2}}, where {stream Join table returns a stream}. And in Kafka Streams DSL you can just do {{stream.selectKey(table1.key).join(table1).selectKey(table2.key).join(table2)}}. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359835#comment-15359835 ] Guozhang Wang commented on KAFKA-3705: -- Yes, this does finally clarify your scenario, thanks!. I think the change<> pair can still help in your case, because it has the benefit that for aggregations for example, you have the clear information that "subtract the old value, and add the new value" instead of depending on whether the returned value is null. For example, the Streams DSL defines the aggregation operator in the following way (note that in your customized implementation you do not need to strictly follow the same pattern, but just to illustrate this idea): {code} KTable aggregate(Initializer initializer, Aggregator adder, Aggregator substractor, ...); {code} Let me try again with your example code and the aggregation pattern in the above Streams DSL, and if you do not agree let's have a small skype chat :) 1. Suppose your current value in Table B is B.PK => B.V.old, which contains A.PK.old, C.PK, etc. And when you join tables A and B, you repartition the stream B by A.PK while still maintaining the message format as B.BK => B.V, and the join result is in the format of: B.PK => join 2. Now suppose you have an update on Table B, as B.PK => B.V.new, which contains A.PK.new, C.PK (same value), etc. And suppose it is represented as a change pair of {old, new}, i.e. B.PK => {B.V.old, B.V.new}, or more specifically: B.PK => {, } 3. When you repartition it based on A.PK value, this will result in two pairs sending to potentially two different partitions, as: B.PK => {B.V.old, null} (sent to partition1) B.PK => {null, B.V.new}(sent to partition2) 4. These two records will be joined independently at two processors, each fetching one of the re-partitioned topic partition, and the result is: B.PK => {joined(B.V.old, A.V.old), null} (here A.V.old corresponds to the value for key A.PK.old in Table A) B.PK => {null, joined(B.V.new, A.V.new)} (here A.V.new corresponds to the value for key A.PK.new in Table A) and then they will be sent to the second topic that is partitioned on C.PK, and since their C.PK value is the same, they will be sent to the same partition, but in arbitrary order. 5. The aggregation function consumes from the second re-partition topic based on C.PK, and does the aggregation by 1) call a subtract function on the old value of the pair, and then 2) call an add function on the new value of the pair, and if the value is null, skip that call. And more specifically the subtract / add functions look like: {code} List> subtractor.apply(C key, Joined value, List> current) { current.remove(key) return m.entrySet.asList } List> adder.apply(C key, Joined value, List> current) { current.put(key, value) return m.entrySet.asList } {code} And based on the order these two records are received, we will either call {{subtract(C.PK, joined(B.V.old, A.V.old), current)}} first, and then {{add(C.PK, joined(B.V.new, A.V.new), current}}, or vice versa, and either way it is correct, since {{B.V.old}} and {{B.V.new}} are different keys. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566 ] Jan Filipiak commented on KAFKA-3705: - Hi, yes that is kinda where I am coming from. I completely understand where you are. Doing the change log case ( logging Change<> objects) is just one implementation of this repartitioning and mine is another one. I am very familiar with my approach as I wrote some Samza apps using this approach. It has many benefits that may or may not be of interest. (repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make state HA, see previous) etc.). What we are still missing here is a mutual understanding of what I think keywidening does and how to expose that to users in a non insane manner. Maybe I try it with your Json syntax. This is the very example we have and where this tickets feature would allow me to build it in the dsl level of the api. So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => > this will then be read by our application servers and servers them as a faster way to retrieves this than lets say the original mysql. B has foreign keys in A and C. All tables start of as one topic. keyed by this tables primary key Topic mysq__jadajadajada_A A.PK => A Topic mysq_B B.PK => B Topic mysq_C C.PK => C I am going to repartition B to A.PK now. In the first example without a widened key. Then it stays B.PK => B but partitioned by A.PK accordingly. then I can do the join with A and get B.PK => joined as of your previous comment: {quote} Then a join result of {a="a1", joined = join("a1-pre", "c1")} {quote} Note the Key stays B.PK (unwindened). Now I am going to repartition based on C.PK still maintaining B.PK => joined as the topic layout. Now, shit hits the fan. As I am doing my aggregation to become C,PK => List> How would this aggregator looks now? {code:java} List> apply(B key, Joined value, List> current) { Map m = listToMap(current, bKeyExtractorValueMapper,B.PK>); if(value == null) { m.remove(key) }else { m.put(key,value) } return m.entrySet.asList } {code} This wouldn't be much different with logged Changes> only the remove and add would be to methods. The problem is, that it doesn't look wrong. But this code now has race conditions. Think about an update to the A.PK field of a B record that forces it to switch partitions. (the C.PK value remains) then we publish a delete to the old partition and the new value to the new partition. Then we do the join. then we repartition on the non changed C.PK. This will make out code above see B.PK => null /remove B.PK => Joined /add in no particular order. Hence the output is undefined. If we had forcefully by api widened the key to be Joined the error would not happen and users would be aware of what happens on repartitioning. I thought this through and it also happens with logging Change<>, as it is really just another implementation. I hope this finally clarifies that key widening I am talking about. If not, maybe we should have a small skype or something. My recommendation is further to not implement this joins as logged Changes<> as it is just more resource intensive and less efficient also making the api more complicated. PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are applicable to streams aswell. Maybe have them in the back of your head. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357696#comment-15357696 ] Guozhang Wang commented on KAFKA-3705: -- Yeah this clarifies a lot. I see that you are not trying to creating a "table" inside Kafka Streams, but just want to re-partition the input binlog stream, and you are working on the lower-level APIs (originally I was confused since you mention "KTable" which is only available in the higher-level DSL). And here is my understanding / suggestions: 1. the original Kafka topic that was directly piped from your MySQL bin-log has the form (key -> value) a -> {a', other-fields} and you want to repartition it into a new topic on field a', and also log compacted on a'. 2. so instead of representing a delete / update record as "a -> null" / "a -> a'-new, other-fields-new", you can send the messages in a different format (there are already a few tools including Kafka Connect which allows to represent your binlog entries with such flexibility): a -> {pair{a'-old, a'-new}, other-fields-current-value } So that deletion becomes: a -> {pair{a'-old, null}, other-fields} 3. in this case, you can access the value field to extract a' for partitioning even for deletion cases, and also for an update record, you can then send two records as the following to the re-partition topic: a'-old -> null, a'-new -> other-fields. Will that work for you? > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15352103#comment-15352103 ] Jan Filipiak commented on KAFKA-3705: - I will just shoot a quick reply now, time somehow became sparse recently. Anyhow. The bottom line of our misunderstandings is always the same thing. My bad that I didn't see the wiki page, if that Range-query interface is addressed that's nice :D. Point 3 is the one that causes the most confusion I guess. In the repartition case we follow different pathes, where I am not sure that I was able to communicate mine well enough. I <3 the idea of having everything a derived store. ITE all this is beeing used to tail -F mysql-.bin | kafka | XXX | redis, therefore Redis become a derived store of mysql wich can be used for NoSql style reads. I infact am such a great fan of this concept that I tend to treat everything a derived store. For me this means a repartitioned topic is a derived store of the source topic. This stands in contrast to make a changelog out of it and materialize the changelog in say RocksDb. This leads to the "problem" that the changelog topic is not a derived store anymore. Wich gives me a personally bad feeling, it just pushes me out of my comfort zone. Confluent peeps seem to be in their comfort zone with change logging topics. In my narrative shit hits the fan when the property of beeing a derived store is lost. It leads to all the nasty things like beeing in the need of change logging your say RocksDbs as the intermidate topic wont hold stuff forever. In contrast to having a change-logging topic that I re-materialize and then changecapture again, I prefer todo the change capturing first and only maintain the state to wich downstream partitions a record is currently published. This works clean and nicely but brings with it what I call "key widening". Say I have KTable A and i want to repartition it to A' so that the topic containing A' is a derived store & logcompacted. Then I cant use Key todo this for 2 reasons. The Stream partition, can only access the key to determine the partition to delete from (deletes come as null values), wich means the fields going to determine the partitions need to be in the key no matter what. Snippet: {code:java} topology.addSink(name, repartitionTopicName, new StreamPartitioner(){ private Serializer intermediateSerializer = intermediateSerde.serializer(); @Override public Integer partition(K key, VR value, int numPartitions) { KL newKey = intermideateKeyExtractor.apply(key); //Copied from Default Partitioner, didn't want to create a CLUSTER object here to reuse it. return (Utils.murmur2(intermediateSerializer.serialize(repartitionTopicName, newKey)) % numPartitions )& 0x7fff; } }, repartitionProcessorName); {code} As you can see the result Key K contains the KL ( the key of the not repatitioned table). the second reason why this key must be there is that one needs to be able to build a derived stream A''. But since in A' a record can "move" from partition X to Y there is a race condition between the "insert" in Y and the delete in X. The repartitioner Processor repartitioning for A'' needs to treat them as different keys. If it would be the same key the delete would wipe the new value maybe. This puts downstream consumers of A'' also in the wired position that at any point in time there can be as many A-keys with the same value as there are A' partitons -1 or a specific A key might vanish completly and then reappear. Wich is sometimes wanky to work around in the end application. But there is enough strategies to solve at least the multiple Akeys case, not so much for the complete fanish case. I hope this clarrifies stuff. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340814#comment-15340814 ] Guozhang Wang commented on KAFKA-3705: -- Thanks for the feedbacks! Re 1: Not sure I fully understand this. I thought you can pass a {{StreamPartitioner}} when calling {{addSink}} which should be sufficient? Re 2: We are aware of this, and as discussed in the wiki our current proposal is that we can use sth. similar to what you mentioned as {{range(K1 prefix)}} and check if {{key.startsWith(prefix)}} to stop iterating. There are some optimizations with prefix seeking in RocksDB but we need to contribute back to RocksDB's JNI to make use of it. https://cwiki.apache.org/confluence/display/KAFKA/Discussion%3A+Non-key+KTable-KTable+Joins#Discussion:Non-keyKTable-KTableJoins-Simpleapproach:seekwithkeydirectly Re 3: The idea is that for the repatitioning, we are going to first transform the old key-value pair into >, but when sending the key-value pair to the re-partition topic, specify the {{StreamPartitioner}} to partition based on combo (remember its assign API takes both the key and value), and let the joiner after the repartitioning to be applied on only. When old value needs to be sent as well, we are going to send the {new, old} pair separately as two record: >, and >, and still partition on combo and . These two records may be sent to two different partitions and hence processed by two different processors, which are expected behavior. Does that look reasonable to you? > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340779#comment-15340779 ] Jan Filipiak commented on KAFKA-3705: - A few things I came accross building the current implementation based on the processor API. 1. Partitioning I ended up with the need of passing an additional ValueMapper into the method. I had to use it in the Sinks partitioner to extract the _partition/join-key_ from the key that is used for the repartition topic. It had to be extracted from the key as I still need to be able to pass nullvalues to the correct partition for deletes. This came from not knowing the number of partitions in the processor but only in the partitoner, this made the "API" kinda complicated. 2. Range Select This ValueMapper mentioned above also had to be passed into the RocksDBIterator. Havin KeyValueIterator range(K from, K to) is not "natural" for prefix range querries. KeyValueIterator range(K1 prefix) where Serde needs to produce prefixbytes of Serde 3. Key expansion After a join in this fashion, the key is what I started refering to as widened. Say you have KTable and it is the table that needs to be repartitioned and KP is the repartition key, then, independently on the other table the new Key of the table must include KR and AK, wich is a wired thing compared to the traditonal relational database way. Imagin having a result table as KTable,Pair> then the used to be unique key AK is not unique anymore, the processor might see the insert in the one partition before the delete in the other (eg when the rows KP was update). I think this should be embrased, because that is how it is. It should just be apparent for the user maybe as it needs to be dealt with in downstream processors. Unrelated to the topic of joining, the processor api not necessarily comfortable, I appreaceate the beauty of the threading model but stiching graphs together based on processornames and strings is more tricky than I tought. Anyhow really nice stream processing framework. It feels and looks so much better than what is out there spark or storm. Watching their desprate attempts to put state in is a joy. Nice work. As soon as our implementation is hardend in production, Ill probably can share. > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable
[ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314643#comment-15314643 ] Guozhang Wang commented on KAFKA-3705: -- Another thing we may want to consider is to use a different memtable option in RocksDB: https://github.com/facebook/rocksdb/wiki/Hash-based-memtable-implementations > Support non-key joining in KTable > - > > Key: KAFKA-3705 > URL: https://issues.apache.org/jira/browse/KAFKA-3705 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Liquan Pei > Labels: api > Fix For: 0.10.1.0 > > > Today in Kafka Streams DSL, KTable joins are only based on keys. If users > want to join a KTable A by key {{a}} with another KTable B by key {{b}} but > with a "foreign key" {{a}}, and assuming they are read from two topics which > are partitioned on {{a}} and {{b}} respectively, they need to do the > following pattern: > {code} > tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' > is partitioned on "a" > tableA.join(tableB', joiner); > {code} > Even if these two tables are read from two topics which are already > partitioned on {{a}}, users still need to do the pre-aggregation in order to > make the two joining streams to be on the same key. This is a draw-back from > programability and we should fix it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)