Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table
Oh ok, I had assumed that distinguishing cases 1 and 2 was possible. Thanks. On 7 Jul 2016 1:02 p.m., "Guozhang Wang"wrote: > Hello, > > The problem is that, in stream processing when you have a new record coming > from the big table tB and does not find a match with the small table (i.e. > only a sub set of keys in practice) in tA, you cannot tell which case of > the following is true: > > 1. there was a matching record in tA, which gets deleted before this tB > record arrives. In this case we need to output a null record indicating to > "negate" the previous join result. > 2. there was never a matching record in tA. In this case we actually do not > need to output anything. > > As I mentioned, the root cause is that tables in stream processing can be > updated while it is on-going. As for Kafka Streams implementation, the if a > null record is received in the downstream operators while there is no > existing record for that key in the materialized view, then it is treated > as a no-op. > > > Guozhang > > > On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome > wrote: > > > thanks, I understand that it's not modelled the same way as database > joins. > > > > Let's take an example of inner join of very small population set (NBA > > rookies or US senators) with larger table (data on zip codes). Let's > assume > > we want to identify the crime rate of zip codes where current senators > live > > or the median income of zip codes where NBA rookies currently live. These > > small elite population samples will likely never live in the 50% poorest > > zip codes in US (although exceptionally some might) and NBA rookies will > > not live far from their team home base (Maine, Alaska, Hawaii, North > > Dakota) so many zip codes will not match and are expected to never match. > > So, I don't see that the keys representing such zip codes will become > > eventually consistent. > > > > One can imagine an application that makes case of census data (with many > > zip codes) and interested in many such statistics for several such small > > "elite" populations and then the irrelevant zip codes with null records > > find their ways multiple times in the data pipeline. > > > > I cannot think of a more egregious example where one table has billions > of > > keys and the other only a handful that would match but I'd assume that > such > > use cases could be natural. > > > > It seems to me that the null keys should be output to represent a record > > deletion in the resulting table, but not a near miss on data selection. > > > > On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang > wrote: > > > > > Hello, > > > > > > The KTable join semantics is not exactly the same with that of a RDBMS. > > You > > > can fine detailed semantics in the web docs (search for Joining > Streams): > > > > > > > > > > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl > > > > > > In a nutshell, the joiner will be triggered only if both / left / > either > > of > > > the joining streams has the matching record with the key of the > incoming > > > received record (so the input values of the joiner could not be null / > > can > > > be null for only the other value / can be null on either values, but > not > > > both), and otherwise a pair of {join-key, null} is output. We made this > > > design deliberately just to make sure that "table-table joins are > > > eventually consistent". This gives a kind of resilience to late arrival > > of > > > records that a late arrival in either stream can "update" the join > > result. > > > > > > > > > Guozhang > > > > > > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome > > > wrote: > > > > > > > Same happens for regular join, keys that appear only in one stream > will > > > > make it to output KTable tC with a null for either input stream. I > > guess > > > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella > > > JIRA > > > > 3909, Queryable state for Kafka Streams? > > > > > > > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome > > > > wrote: > > > > > > > > > If we have two streams A and B for which we associate tables tA and > > tB, > > > > > then create a table tC as ta.leftJoin(tB, ) and > > then > > > > we > > > > > have a key kB in stream B but never made it to tA nor tC, do we > need > > to > > > > > inject a pair (k,v) of (kB, null) into resulting change log for tC > ? > > > > > > > > > > It sounds like it is definitely necessary if key kB is present in > > table > > > > tC > > > > > but if not, why add it? > > > > > > > > > > I have an example that reproduces this and would like to know if it > > is > > > > > considered normal, sub-optimal, or a defect. I don't view it as > > normal > > > > for > > > > > time being, particularly considering stream A as having very few > keys > > > > and B > > > > > as having many, which could lead to an unnecessary large change
Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table
Hello, The problem is that, in stream processing when you have a new record coming from the big table tB and does not find a match with the small table (i.e. only a sub set of keys in practice) in tA, you cannot tell which case of the following is true: 1. there was a matching record in tA, which gets deleted before this tB record arrives. In this case we need to output a null record indicating to "negate" the previous join result. 2. there was never a matching record in tA. In this case we actually do not need to output anything. As I mentioned, the root cause is that tables in stream processing can be updated while it is on-going. As for Kafka Streams implementation, the if a null record is received in the downstream operators while there is no existing record for that key in the materialized view, then it is treated as a no-op. Guozhang On Wed, Jul 6, 2016 at 3:59 AM, Philippe Deromewrote: > thanks, I understand that it's not modelled the same way as database joins. > > Let's take an example of inner join of very small population set (NBA > rookies or US senators) with larger table (data on zip codes). Let's assume > we want to identify the crime rate of zip codes where current senators live > or the median income of zip codes where NBA rookies currently live. These > small elite population samples will likely never live in the 50% poorest > zip codes in US (although exceptionally some might) and NBA rookies will > not live far from their team home base (Maine, Alaska, Hawaii, North > Dakota) so many zip codes will not match and are expected to never match. > So, I don't see that the keys representing such zip codes will become > eventually consistent. > > One can imagine an application that makes case of census data (with many > zip codes) and interested in many such statistics for several such small > "elite" populations and then the irrelevant zip codes with null records > find their ways multiple times in the data pipeline. > > I cannot think of a more egregious example where one table has billions of > keys and the other only a handful that would match but I'd assume that such > use cases could be natural. > > It seems to me that the null keys should be output to represent a record > deletion in the resulting table, but not a near miss on data selection. > > On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang wrote: > > > Hello, > > > > The KTable join semantics is not exactly the same with that of a RDBMS. > You > > can fine detailed semantics in the web docs (search for Joining Streams): > > > > > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl > > > > In a nutshell, the joiner will be triggered only if both / left / either > of > > the joining streams has the matching record with the key of the incoming > > received record (so the input values of the joiner could not be null / > can > > be null for only the other value / can be null on either values, but not > > both), and otherwise a pair of {join-key, null} is output. We made this > > design deliberately just to make sure that "table-table joins are > > eventually consistent". This gives a kind of resilience to late arrival > of > > records that a late arrival in either stream can "update" the join > result. > > > > > > Guozhang > > > > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome > > wrote: > > > > > Same happens for regular join, keys that appear only in one stream will > > > make it to output KTable tC with a null for either input stream. I > guess > > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella > > JIRA > > > 3909, Queryable state for Kafka Streams? > > > > > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome > > > wrote: > > > > > > > If we have two streams A and B for which we associate tables tA and > tB, > > > > then create a table tC as ta.leftJoin(tB, ) and > then > > > we > > > > have a key kB in stream B but never made it to tA nor tC, do we need > to > > > > inject a pair (k,v) of (kB, null) into resulting change log for tC ? > > > > > > > > It sounds like it is definitely necessary if key kB is present in > table > > > tC > > > > but if not, why add it? > > > > > > > > I have an example that reproduces this and would like to know if it > is > > > > considered normal, sub-optimal, or a defect. I don't view it as > normal > > > for > > > > time being, particularly considering stream A as having very few keys > > > and B > > > > as having many, which could lead to an unnecessary large change log > for > > > C. > > > > > > > > Phil > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table
thanks, I understand that it's not modelled the same way as database joins. Let's take an example of inner join of very small population set (NBA rookies or US senators) with larger table (data on zip codes). Let's assume we want to identify the crime rate of zip codes where current senators live or the median income of zip codes where NBA rookies currently live. These small elite population samples will likely never live in the 50% poorest zip codes in US (although exceptionally some might) and NBA rookies will not live far from their team home base (Maine, Alaska, Hawaii, North Dakota) so many zip codes will not match and are expected to never match. So, I don't see that the keys representing such zip codes will become eventually consistent. One can imagine an application that makes case of census data (with many zip codes) and interested in many such statistics for several such small "elite" populations and then the irrelevant zip codes with null records find their ways multiple times in the data pipeline. I cannot think of a more egregious example where one table has billions of keys and the other only a handful that would match but I'd assume that such use cases could be natural. It seems to me that the null keys should be output to represent a record deletion in the resulting table, but not a near miss on data selection. On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wangwrote: > Hello, > > The KTable join semantics is not exactly the same with that of a RDBMS. You > can fine detailed semantics in the web docs (search for Joining Streams): > > > http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl > > In a nutshell, the joiner will be triggered only if both / left / either of > the joining streams has the matching record with the key of the incoming > received record (so the input values of the joiner could not be null / can > be null for only the other value / can be null on either values, but not > both), and otherwise a pair of {join-key, null} is output. We made this > design deliberately just to make sure that "table-table joins are > eventually consistent". This gives a kind of resilience to late arrival of > records that a late arrival in either stream can "update" the join result. > > > Guozhang > > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome > wrote: > > > Same happens for regular join, keys that appear only in one stream will > > make it to output KTable tC with a null for either input stream. I guess > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella > JIRA > > 3909, Queryable state for Kafka Streams? > > > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome > > wrote: > > > > > If we have two streams A and B for which we associate tables tA and tB, > > > then create a table tC as ta.leftJoin(tB, ) and then > > we > > > have a key kB in stream B but never made it to tA nor tC, do we need to > > > inject a pair (k,v) of (kB, null) into resulting change log for tC ? > > > > > > It sounds like it is definitely necessary if key kB is present in table > > tC > > > but if not, why add it? > > > > > > I have an example that reproduces this and would like to know if it is > > > considered normal, sub-optimal, or a defect. I don't view it as normal > > for > > > time being, particularly considering stream A as having very few keys > > and B > > > as having many, which could lead to an unnecessary large change log for > > C. > > > > > > Phil > > > > > > > > > -- > -- Guozhang >
Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table
Hello, The KTable join semantics is not exactly the same with that of a RDBMS. You can fine detailed semantics in the web docs (search for Joining Streams): http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl In a nutshell, the joiner will be triggered only if both / left / either of the joining streams has the matching record with the key of the incoming received record (so the input values of the joiner could not be null / can be null for only the other value / can be null on either values, but not both), and otherwise a pair of {join-key, null} is output. We made this design deliberately just to make sure that "table-table joins are eventually consistent". This gives a kind of resilience to late arrival of records that a late arrival in either stream can "update" the join result. Guozhang On Mon, Jul 4, 2016 at 6:10 PM, Philippe Deromewrote: > Same happens for regular join, keys that appear only in one stream will > make it to output KTable tC with a null for either input stream. I guess > it's related to Kafka-3911 Enforce ktable Materialization or umbrella JIRA > 3909, Queryable state for Kafka Streams? > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome > wrote: > > > If we have two streams A and B for which we associate tables tA and tB, > > then create a table tC as ta.leftJoin(tB, ) and then > we > > have a key kB in stream B but never made it to tA nor tC, do we need to > > inject a pair (k,v) of (kB, null) into resulting change log for tC ? > > > > It sounds like it is definitely necessary if key kB is present in table > tC > > but if not, why add it? > > > > I have an example that reproduces this and would like to know if it is > > considered normal, sub-optimal, or a defect. I don't view it as normal > for > > time being, particularly considering stream A as having very few keys > and B > > as having many, which could lead to an unnecessary large change log for > C. > > > > Phil > > > -- -- Guozhang
Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table
Same happens for regular join, keys that appear only in one stream will make it to output KTable tC with a null for either input stream. I guess it's related to Kafka-3911 Enforce ktable Materialization or umbrella JIRA 3909, Queryable state for Kafka Streams? On Mon, Jul 4, 2016 at 8:45 PM, Philippe Deromewrote: > If we have two streams A and B for which we associate tables tA and tB, > then create a table tC as ta.leftJoin(tB, ) and then we > have a key kB in stream B but never made it to tA nor tC, do we need to > inject a pair (k,v) of (kB, null) into resulting change log for tC ? > > It sounds like it is definitely necessary if key kB is present in table tC > but if not, why add it? > > I have an example that reproduces this and would like to know if it is > considered normal, sub-optimal, or a defect. I don't view it as normal for > time being, particularly considering stream A as having very few keys and B > as having many, which could lead to an unnecessary large change log for C. > > Phil >