Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Moved this KIP into status "inactive". Feel free to resume and any time. -Matthias On 7/1/18 9:47 PM, Guozhang Wang wrote: > Hello Adam, > > Sorry for being late on this thread. I've read through your updated wiki > and here are some thoughts: > > * I agree with your assessed impediments. In fact, today although the > Global KTable have its own checkpoint files, and restoration process, > during its restoration it will always try to bootstrap the backing global > store up to the Kafka topic's end of offset, before starting any > processing. And hence on each machine node the checkpoint offsets For the > proposed change that Global KTable also needs to trigger joins, it means > its restoration process would not fit as well. On the other hand, even for > Global KTable - Stream joins we do not have such guarantees either: imagine > if there is a crash, or even graceful shutdown scenario, when the task is > back online and Global KTable is bootstrapped, it does not guarantee to be > at the same offset position when it has stopped anyways. So I think one can > argue that users of Global KTable - KTable join should not expect this > semantics either. > > * The other issue, which I mentioned above, is that the updates of the > joins triggered by the Global KTable, and hence executed by the global > thread, now needs to be propagated into the downstream operators, and more > important following the order of the join: i.e. if there is a record coming > from Global KTable, and then later another record coming from the other > KTable. It means that then the global thread and the stream thread needs to > be synchronized (note that today these threads are totally in parallel to > each other). > > > With that, I think we can 1) continue working on KIP-213 for local KTable > joins, and 2) continue this KIP for Global KTable - KTable joins, while > educating users that similar to Global KTable - KStream joins, the global > ktable will not Global KTable trigger joins. > > Guozhang > > > On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare > wrote: > >> Thanks for your help so far guys. >> >> While I do think that I have a fairly reasonable way forward for >> restructuring the topologies and threads, there is, unfortunately, what I >> believe is a fatal flaw that cannot be easily resolved. I have updated the >> page ( >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join >> ) with the impediments to the solution, all of which revolve around >> ensuring that data consistency is maintained. It seems to me that >> GlobalKTables are not the way forward here and that I may be best >> redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/ >> confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ). >> >> I would appreciate being proven wrong on my impediment listings, but if >> there are no other ideas I think we should close this KIP and the >> associated JIRA. A KTable to GlobalKTable join driven just by the KTable is >> simply performed by a stream to GKT join with a groupbyKey and reduce to >> form a state-store, so I would see no need to keep it open otherwise >> (unless just for the shorthand notation). >> >> Thanks again >> >> Adam >> >> On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang wrote: >> >>> Hello Adam, >>> >>> Please see my comments inline. >>> >>> On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare < >> adam.bellem...@gmail.com> >>> wrote: >>> Hi Guozhang *Re: Questions* *1)* I do not yet have a solution to this, but I also did not look that closely at it when I begun this KIP. I admit that I was unaware of >>> exactly how the GlobalKTable worked alongside the KTable/KStream topologies. >> You mention "It means the two topologies will be merged, and that merged topology can only be executed as a single task, by a single thread. " - >>> is the problem here that the merged topology would be parallelized to >> other threads/instances? While I am becoming familiar with how the topologies >>> are created under the hood, I am not yet fully clear on the implications of your statement. I will look into this further. >>> Yes. The issue is that today each task is executed by a single thread >> only >>> at any given time, and hence any state stores are only accessed by a >> single >>> thread (except for interactive queries, and for global tables where the >>> global update thread write to the global store, and the local thread read >>> from the global store), if we let the global store update thread to be >> also >>> triggering joins and puts send the results into the downstream operators, >>> then it means that the global store update thread can access on any state >>> stores in the subsequent part of the topology, breaking our current >>> threading model. >>> >>> *2)* " do you mean that although we have a duplicated state store: ModifiedEvents in addition to
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Hello Adam, Sorry for being late on this thread. I've read through your updated wiki and here are some thoughts: * I agree with your assessed impediments. In fact, today although the Global KTable have its own checkpoint files, and restoration process, during its restoration it will always try to bootstrap the backing global store up to the Kafka topic's end of offset, before starting any processing. And hence on each machine node the checkpoint offsets For the proposed change that Global KTable also needs to trigger joins, it means its restoration process would not fit as well. On the other hand, even for Global KTable - Stream joins we do not have such guarantees either: imagine if there is a crash, or even graceful shutdown scenario, when the task is back online and Global KTable is bootstrapped, it does not guarantee to be at the same offset position when it has stopped anyways. So I think one can argue that users of Global KTable - KTable join should not expect this semantics either. * The other issue, which I mentioned above, is that the updates of the joins triggered by the Global KTable, and hence executed by the global thread, now needs to be propagated into the downstream operators, and more important following the order of the join: i.e. if there is a record coming from Global KTable, and then later another record coming from the other KTable. It means that then the global thread and the stream thread needs to be synchronized (note that today these threads are totally in parallel to each other). With that, I think we can 1) continue working on KIP-213 for local KTable joins, and 2) continue this KIP for Global KTable - KTable joins, while educating users that similar to Global KTable - KStream joins, the global ktable will not Global KTable trigger joins. Guozhang On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare wrote: > Thanks for your help so far guys. > > While I do think that I have a fairly reasonable way forward for > restructuring the topologies and threads, there is, unfortunately, what I > believe is a fatal flaw that cannot be easily resolved. I have updated the > page ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 314%3A+KTable+to+GlobalKTable+Bi-directional+Join > ) with the impediments to the solution, all of which revolve around > ensuring that data consistency is maintained. It seems to me that > GlobalKTables are not the way forward here and that I may be best > redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/ > confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ). > > I would appreciate being proven wrong on my impediment listings, but if > there are no other ideas I think we should close this KIP and the > associated JIRA. A KTable to GlobalKTable join driven just by the KTable is > simply performed by a stream to GKT join with a groupbyKey and reduce to > form a state-store, so I would see no need to keep it open otherwise > (unless just for the shorthand notation). > > Thanks again > > Adam > > On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang wrote: > > > Hello Adam, > > > > Please see my comments inline. > > > > On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare < > adam.bellem...@gmail.com> > > wrote: > > > > > Hi Guozhang > > > > > > *Re: Questions* > > > *1)* I do not yet have a solution to this, but I also did not look that > > > closely at it when I begun this KIP. I admit that I was unaware of > > exactly > > > how the GlobalKTable worked alongside the KTable/KStream topologies. > You > > > mention "It means the two topologies will be merged, and that merged > > > topology can only be executed as a single task, by a single thread. " - > > is > > > the problem here that the merged topology would be parallelized to > other > > > threads/instances? While I am becoming familiar with how the topologies > > are > > > created under the hood, I am not yet fully clear on the implications of > > > your statement. I will look into this further. > > > > > > > > Yes. The issue is that today each task is executed by a single thread > only > > at any given time, and hence any state stores are only accessed by a > single > > thread (except for interactive queries, and for global tables where the > > global update thread write to the global store, and the local thread read > > from the global store), if we let the global store update thread to be > also > > triggering joins and puts send the results into the downstream operators, > > then it means that the global store update thread can access on any state > > stores in the subsequent part of the topology, breaking our current > > threading model. > > > > > > > *2)* " do you mean that although we have a duplicated state store: > > > ModifiedEvents in addition to the original Events with only the > enhanced > > > key, this is not avoidable anyways even if we do re-keying?" Yes, that > is > > > correct, that is what I meant. I need to improve my knowledge around > this > > >
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Thanks for your help so far guys. While I do think that I have a fairly reasonable way forward for restructuring the topologies and threads, there is, unfortunately, what I believe is a fatal flaw that cannot be easily resolved. I have updated the page ( https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join ) with the impediments to the solution, all of which revolve around ensuring that data consistency is maintained. It seems to me that GlobalKTables are not the way forward here and that I may be best redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/ confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ). I would appreciate being proven wrong on my impediment listings, but if there are no other ideas I think we should close this KIP and the associated JIRA. A KTable to GlobalKTable join driven just by the KTable is simply performed by a stream to GKT join with a groupbyKey and reduce to form a state-store, so I would see no need to keep it open otherwise (unless just for the shorthand notation). Thanks again Adam On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang wrote: > Hello Adam, > > Please see my comments inline. > > On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare > wrote: > > > Hi Guozhang > > > > *Re: Questions* > > *1)* I do not yet have a solution to this, but I also did not look that > > closely at it when I begun this KIP. I admit that I was unaware of > exactly > > how the GlobalKTable worked alongside the KTable/KStream topologies. You > > mention "It means the two topologies will be merged, and that merged > > topology can only be executed as a single task, by a single thread. " - > is > > the problem here that the merged topology would be parallelized to other > > threads/instances? While I am becoming familiar with how the topologies > are > > created under the hood, I am not yet fully clear on the implications of > > your statement. I will look into this further. > > > > > Yes. The issue is that today each task is executed by a single thread only > at any given time, and hence any state stores are only accessed by a single > thread (except for interactive queries, and for global tables where the > global update thread write to the global store, and the local thread read > from the global store), if we let the global store update thread to be also > triggering joins and puts send the results into the downstream operators, > then it means that the global store update thread can access on any state > stores in the subsequent part of the topology, breaking our current > threading model. > > > > *2)* " do you mean that although we have a duplicated state store: > > ModifiedEvents in addition to the original Events with only the enhanced > > key, this is not avoidable anyways even if we do re-keying?" Yes, that is > > correct, that is what I meant. I need to improve my knowledge around this > > component too. I have been browsing the KIP-213 discussion thread and > > looking at Jan's code > > > > *Re: Comments* > > *1) *Makes sense. I will update the diagram accordingly. Thanks! > > > > *2)* Wouldn't outer join require that we emit records from the right > > GlobalKTable that have no match in the left KTable? This seems undefined > to > > me with the current proposal (above issues aside), since multiple threads > > would be producing the same output event for a single GlobalKTable > update. > > > > > I was considering mainly about the semantics of table-table joins, that > whether we should add this operator inside our API. Implementation wise, we > will only have one global store update thread per instance, so there will > not be multiple threads producing the same output, but still there would be > other issues that we should consider indeed, as mentioned above. Again this > comment is not about implementations, but API wise if it is desirable to > add it. > > > > > > Questions for you both: > > Q1) Is a KTable always materialized? I am looking at the code under the > > hood, and it seems to me that it's either materialized with an explicit > > Materialized object, or it's given an anonymous name and the default > serdes > > are used. Am I correct in this observation? > > > > > A KTable is not always materialized. For example, a KTable generated from > `KTable#filter` or `KTable#mapValues` does not create a new materialized > state store, but we use the caller `KTable` 's state store for anyone who > wants to query it in joins. > > Moving forward, we are also trying to optimize the topology to only > "logically" materialize a KTable when necessary, this is summarized in > https://issues.apache.org/jira/browse/KAFKA-6761 > > > > > > Thanks, > > Adam > > > > > > -- > -- Guozhang >
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Hello Adam, Please see my comments inline. On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare wrote: > Hi Guozhang > > *Re: Questions* > *1)* I do not yet have a solution to this, but I also did not look that > closely at it when I begun this KIP. I admit that I was unaware of exactly > how the GlobalKTable worked alongside the KTable/KStream topologies. You > mention "It means the two topologies will be merged, and that merged > topology can only be executed as a single task, by a single thread. " - is > the problem here that the merged topology would be parallelized to other > threads/instances? While I am becoming familiar with how the topologies are > created under the hood, I am not yet fully clear on the implications of > your statement. I will look into this further. > > Yes. The issue is that today each task is executed by a single thread only at any given time, and hence any state stores are only accessed by a single thread (except for interactive queries, and for global tables where the global update thread write to the global store, and the local thread read from the global store), if we let the global store update thread to be also triggering joins and puts send the results into the downstream operators, then it means that the global store update thread can access on any state stores in the subsequent part of the topology, breaking our current threading model. > *2)* " do you mean that although we have a duplicated state store: > ModifiedEvents in addition to the original Events with only the enhanced > key, this is not avoidable anyways even if we do re-keying?" Yes, that is > correct, that is what I meant. I need to improve my knowledge around this > component too. I have been browsing the KIP-213 discussion thread and > looking at Jan's code > > *Re: Comments* > *1) *Makes sense. I will update the diagram accordingly. Thanks! > > *2)* Wouldn't outer join require that we emit records from the right > GlobalKTable that have no match in the left KTable? This seems undefined to > me with the current proposal (above issues aside), since multiple threads > would be producing the same output event for a single GlobalKTable update. > > I was considering mainly about the semantics of table-table joins, that whether we should add this operator inside our API. Implementation wise, we will only have one global store update thread per instance, so there will not be multiple threads producing the same output, but still there would be other issues that we should consider indeed, as mentioned above. Again this comment is not about implementations, but API wise if it is desirable to add it. > > Questions for you both: > Q1) Is a KTable always materialized? I am looking at the code under the > hood, and it seems to me that it's either materialized with an explicit > Materialized object, or it's given an anonymous name and the default serdes > are used. Am I correct in this observation? > > A KTable is not always materialized. For example, a KTable generated from `KTable#filter` or `KTable#mapValues` does not create a new materialized state store, but we use the caller `KTable` 's state store for anyone who wants to query it in joins. Moving forward, we are also trying to optimize the topology to only "logically" materialize a KTable when necessary, this is summarized in https://issues.apache.org/jira/browse/KAFKA-6761 > > Thanks, > Adam > > -- -- Guozhang
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Hi Guozhang *Re: Questions* *1)* I do not yet have a solution to this, but I also did not look that closely at it when I begun this KIP. I admit that I was unaware of exactly how the GlobalKTable worked alongside the KTable/KStream topologies. You mention "It means the two topologies will be merged, and that merged topology can only be executed as a single task, by a single thread. " - is the problem here that the merged topology would be parallelized to other threads/instances? While I am becoming familiar with how the topologies are created under the hood, I am not yet fully clear on the implications of your statement. I will look into this further. *2)* " do you mean that although we have a duplicated state store: ModifiedEvents in addition to the original Events with only the enhanced key, this is not avoidable anyways even if we do re-keying?" Yes, that is correct, that is what I meant. I need to improve my knowledge around this component too. I have been browsing the KIP-213 discussion thread and looking at Jan's code *Re: Comments* *1) *Makes sense. I will update the diagram accordingly. Thanks! *2)* Wouldn't outer join require that we emit records from the right GlobalKTable that have no match in the left KTable? This seems undefined to me with the current proposal (above issues aside), since multiple threads would be producing the same output event for a single GlobalKTable update. Questions for you both: Q1) Is a KTable always materialized? I am looking at the code under the hood, and it seems to me that it's either materialized with an explicit Materialized object, or it's given an anonymous name and the default serdes are used. Am I correct in this observation? Thanks, Adam On Wed, Jun 20, 2018 at 6:44 PM, Guozhang Wang wrote: > Hello Adam, > > Thanks for proposing the KIP. A few meta comments: > > 1. As Matthias mentioned, the current GlobalKTable is designed to be > read-only, and not driving any computations (btw the global store backing a > GlobalKTable should also be read-only). Behind the scene the global store > updating task and the regular streams task are two separate ones running > two separate processor topologies by two threads: the global store updating > task's topology is simply a source node, plus a processor node (let's call > it the update-processor) that puts to the store. If we allow the > GlobalKTable to drive the join, then we need the underlying global store's > update processor to link to the downstream processors of the normal regular > task's topology in order to pass the joined results to downstream. It means > the two topologies will be merged, and that merged topology can only be > executed as a single task, by a single thread. We need to think of a way > how to work around this issue first of all before proceeding to next steps. > > 2. Not clear what do you mean by "In terms of data complexity, any pattern > that requires us to rekey the data once is equivalent in terms of data > capacity requirements.." do you mean that although we have a duplicated > state store: ModifiedEvents in addition to the original Events with only > the enhanced key, this is not avoidable anyways even if we do re-keying? > Note that in > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 213+Support+non-key+joining+in+KTable?preview=/74684836/ > 74687529/Screenshot%20from%202017-11-18%2023%3A26%3A52.png > we were considering if it is still possible to only materialize the joining > tables once each still, i.e. not having a duplicated store. So I think it > is not necessarily the case that we have to duplicate the KTable's store. > > > One minor comment: > > 1. In `*KTable as Driver, joined on GlobalKTable join mechanism`* section, > I think we still need to join the old value with the global store to form a > pair of "" joined result, so that the resulting KTable can still > be applied in another aggregation operator that allows correct addition / > subtraction logic. > > 2. For KTable-KTable join, we have inner / left / outer, while for > KStream-KTable / GlobalKTable join we only have inner / left, and the > reason is that for stream-table joins outer join makes less sense; should > we consider outer for KTable-GlobalKTable join as well? > > > Guozhang > > > On Tue, Jun 19, 2018 at 10:27 AM, Adam Bellemare > > wrote: > > > Matthias > > > > Thanks for the links. I have seen those before but I will dig deeper into > > them, especially around the CombinedKey and the flush + cache + rangescan > > functionality. I believe Jan had a PR with many of the changes in there, > > perhaps I can use some of the work that was done there to help leverage a > > similar (or identical) design. > > > > I will certainly be able to make a PoC before going to vote on this one. > It > > is a larger change and I suspect that we will need to review some of the > > finer points to ensure that the design is still suitable and sufficiently > > performant. I'll post back when I have something
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Hello Adam, Thanks for proposing the KIP. A few meta comments: 1. As Matthias mentioned, the current GlobalKTable is designed to be read-only, and not driving any computations (btw the global store backing a GlobalKTable should also be read-only). Behind the scene the global store updating task and the regular streams task are two separate ones running two separate processor topologies by two threads: the global store updating task's topology is simply a source node, plus a processor node (let's call it the update-processor) that puts to the store. If we allow the GlobalKTable to drive the join, then we need the underlying global store's update processor to link to the downstream processors of the normal regular task's topology in order to pass the joined results to downstream. It means the two topologies will be merged, and that merged topology can only be executed as a single task, by a single thread. We need to think of a way how to work around this issue first of all before proceeding to next steps. 2. Not clear what do you mean by "In terms of data complexity, any pattern that requires us to rekey the data once is equivalent in terms of data capacity requirements.." do you mean that although we have a duplicated state store: ModifiedEvents in addition to the original Events with only the enhanced key, this is not avoidable anyways even if we do re-keying? Note that in https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable?preview=/74684836/74687529/Screenshot%20from%202017-11-18%2023%3A26%3A52.png we were considering if it is still possible to only materialize the joining tables once each still, i.e. not having a duplicated store. So I think it is not necessarily the case that we have to duplicate the KTable's store. One minor comment: 1. In `*KTable as Driver, joined on GlobalKTable join mechanism`* section, I think we still need to join the old value with the global store to form a pair of "" joined result, so that the resulting KTable can still be applied in another aggregation operator that allows correct addition / subtraction logic. 2. For KTable-KTable join, we have inner / left / outer, while for KStream-KTable / GlobalKTable join we only have inner / left, and the reason is that for stream-table joins outer join makes less sense; should we consider outer for KTable-GlobalKTable join as well? Guozhang On Tue, Jun 19, 2018 at 10:27 AM, Adam Bellemare wrote: > Matthias > > Thanks for the links. I have seen those before but I will dig deeper into > them, especially around the CombinedKey and the flush + cache + rangescan > functionality. I believe Jan had a PR with many of the changes in there, > perhaps I can use some of the work that was done there to help leverage a > similar (or identical) design. > > I will certainly be able to make a PoC before going to vote on this one. It > is a larger change and I suspect that we will need to review some of the > finer points to ensure that the design is still suitable and sufficiently > performant. I'll post back when I have something more concrete, but in the > meantime I welcome all other concerns and comments. > > Thanks > > > > On Mon, Jun 18, 2018 at 10:05 PM, Matthias J. Sax > wrote: > > > Adam, > > > > thanks a lot for the KIP. I agree that this would be a valuable feature > > to add. It's a very complex one though. You correctly pointed out, that > > the GlobalKTable (or global stores in general) cannot be the "driver" > > atm and are passively updated only. This is by design. Are you familiar > > with the KIP discussion of KIP-99? > > (https://cwiki.apache.org/confluence/pages/viewpage. > action?pageId=67633649 > > ) > > Would be worth to refresh to understand the tradeoffs and design > decisions. > > > > It's unclear to me, what the impact will be if we want to change the > > current design. Even if no GlobalKTable is used, it might have impact on > > performance and for sure on code complexity. Overall, it seems that a > > POC might be required before we can consider adding this (with the > > danger, that it does not get accepted in the end). > > > > Are you aware of KIP-213: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 213+Support+non-key+joining+in+KTable > > > > It suggest to add non-key joins and a lot of issues how to implement > > this were discussed already. As a KTable-GloblKTable join is a non-key > > join, too, it seems that those discussion apply to your KIP too. > > > > Hope this helps to make the next steps. > > > > > > -Matthias > > > > > > On 6/18/18 1:15 PM, Adam Bellemare wrote: > > > Hi All > > > > > > I created KIP-314 and I would like to initiate a discussion on it. > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 314%3A+KTable+to+GlobalKTable+Bi-directional+Join > > > > > > The primary goal of this KIP is to improve the way that Kafka can deal > > with > > > relational data at scale. This KIP would alter the way that >
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Matthias Thanks for the links. I have seen those before but I will dig deeper into them, especially around the CombinedKey and the flush + cache + rangescan functionality. I believe Jan had a PR with many of the changes in there, perhaps I can use some of the work that was done there to help leverage a similar (or identical) design. I will certainly be able to make a PoC before going to vote on this one. It is a larger change and I suspect that we will need to review some of the finer points to ensure that the design is still suitable and sufficiently performant. I'll post back when I have something more concrete, but in the meantime I welcome all other concerns and comments. Thanks On Mon, Jun 18, 2018 at 10:05 PM, Matthias J. Sax wrote: > Adam, > > thanks a lot for the KIP. I agree that this would be a valuable feature > to add. It's a very complex one though. You correctly pointed out, that > the GlobalKTable (or global stores in general) cannot be the "driver" > atm and are passively updated only. This is by design. Are you familiar > with the KIP discussion of KIP-99? > (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649 > ) > Would be worth to refresh to understand the tradeoffs and design decisions. > > It's unclear to me, what the impact will be if we want to change the > current design. Even if no GlobalKTable is used, it might have impact on > performance and for sure on code complexity. Overall, it seems that a > POC might be required before we can consider adding this (with the > danger, that it does not get accepted in the end). > > Are you aware of KIP-213: > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 213+Support+non-key+joining+in+KTable > > It suggest to add non-key joins and a lot of issues how to implement > this were discussed already. As a KTable-GloblKTable join is a non-key > join, too, it seems that those discussion apply to your KIP too. > > Hope this helps to make the next steps. > > > -Matthias > > > On 6/18/18 1:15 PM, Adam Bellemare wrote: > > Hi All > > > > I created KIP-314 and I would like to initiate a discussion on it. > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 314%3A+KTable+to+GlobalKTable+Bi-directional+Join > > > > The primary goal of this KIP is to improve the way that Kafka can deal > with > > relational data at scale. This KIP would alter the way that GlobalKTables > > can be used in relation to KTables. I believe that this would be a very > > useful change but I need some eyes on the technical aspects to validate > or > > refute the strategy. > > > > Thanks > > > > Adam Bellemare > > > >
Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Adam, thanks a lot for the KIP. I agree that this would be a valuable feature to add. It's a very complex one though. You correctly pointed out, that the GlobalKTable (or global stores in general) cannot be the "driver" atm and are passively updated only. This is by design. Are you familiar with the KIP discussion of KIP-99? (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649) Would be worth to refresh to understand the tradeoffs and design decisions. It's unclear to me, what the impact will be if we want to change the current design. Even if no GlobalKTable is used, it might have impact on performance and for sure on code complexity. Overall, it seems that a POC might be required before we can consider adding this (with the danger, that it does not get accepted in the end). Are you aware of KIP-213: https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable It suggest to add non-key joins and a lot of issues how to implement this were discussed already. As a KTable-GloblKTable join is a non-key join, too, it seems that those discussion apply to your KIP too. Hope this helps to make the next steps. -Matthias On 6/18/18 1:15 PM, Adam Bellemare wrote: > Hi All > > I created KIP-314 and I would like to initiate a discussion on it. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join > > The primary goal of this KIP is to improve the way that Kafka can deal with > relational data at scale. This KIP would alter the way that GlobalKTables > can be used in relation to KTables. I believe that this would be a very > useful change but I need some eyes on the technical aspects to validate or > refute the strategy. > > Thanks > > Adam Bellemare > signature.asc Description: OpenPGP digital signature
[DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join
Hi All I created KIP-314 and I would like to initiate a discussion on it. https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join The primary goal of this KIP is to improve the way that Kafka can deal with relational data at scale. This KIP would alter the way that GlobalKTables can be used in relation to KTables. I believe that this would be a very useful change but I need some eyes on the technical aspects to validate or refute the strategy. Thanks Adam Bellemare