Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-07 Thread Philippe Derome
Oh ok, I had assumed that distinguishing cases 1 and 2 was possible.

Thanks.
On 7 Jul 2016 1:02 p.m., "Guozhang Wang"  wrote:

> Hello,
>
> The problem is that, in stream processing when you have a new record coming
> from the big table tB and does not find a match with the small table (i.e.
> only a sub set of keys in practice) in tA, you cannot tell which case of
> the following is true:
>
> 1. there was a matching record in tA, which gets deleted before this tB
> record arrives. In this case we need to output a null record indicating to
> "negate" the previous join result.
> 2. there was never a matching record in tA. In this case we actually do not
> need to output anything.
>
> As I mentioned, the root cause is that tables in stream processing can be
> updated while it is on-going. As for Kafka Streams implementation, the if a
> null record is received in the downstream operators while there is no
> existing record for that key in the materialized view, then it is treated
> as a no-op.
>
>
> Guozhang
>
>
> On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome 
> wrote:
>
> > thanks, I understand that it's not modelled the same way as database
> joins.
> >
> > Let's take an example of inner join of very small population set (NBA
> > rookies or US senators) with larger table (data on zip codes). Let's
> assume
> > we want to identify the crime rate of zip codes where current senators
> live
> > or the median income of zip codes where NBA rookies currently live. These
> > small elite population samples will likely never live in the 50% poorest
> > zip codes in US (although exceptionally some might) and NBA rookies will
> > not live far from their team home base (Maine, Alaska, Hawaii, North
> > Dakota) so many zip codes will not match and are expected to never match.
> > So, I don't see that the keys representing such zip codes will become
> > eventually consistent.
> >
> > One can imagine an application that makes case of census data (with many
> > zip codes) and interested in many such statistics for several such small
> > "elite" populations and then the irrelevant zip codes with null records
> > find their ways multiple times in the data pipeline.
> >
> > I cannot think of a more egregious example where one table has billions
> of
> > keys and the other only a handful that would match but I'd assume that
> such
> > use cases could be natural.
> >
> > It seems to me that the null keys should be output to represent a record
> > deletion in the resulting table, but not a near miss on data selection.
> >
> > On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang 
> wrote:
> >
> > > Hello,
> > >
> > > The KTable join semantics is not exactly the same with that of a RDBMS.
> > You
> > > can fine detailed semantics in the web docs (search for Joining
> Streams):
> > >
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
> > >
> > > In a nutshell, the joiner will be triggered only if both / left /
> either
> > of
> > > the joining streams has the matching record with the key of the
> incoming
> > > received record (so the input values of the joiner could not be null /
> > can
> > > be null for only the other value / can be null on either values, but
> not
> > > both), and otherwise a pair of {join-key, null} is output. We made this
> > > design deliberately just to make sure that "table-table joins are
> > > eventually consistent". This gives a kind of resilience to late arrival
> > of
> > > records that a late arrival in either stream can "update" the join
> > result.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome 
> > > wrote:
> > >
> > > > Same happens for regular join, keys that appear only in one stream
> will
> > > > make it to output KTable tC with a null for either input stream. I
> > guess
> > > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella
> > > JIRA
> > > > 3909, Queryable state for Kafka Streams?
> > > >
> > > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome 
> > > > wrote:
> > > >
> > > > > If we have two streams A and B for which we associate tables tA and
> > tB,
> > > > > then create a table tC as ta.leftJoin(tB, ) and
> > then
> > > > we
> > > > > have a key kB in stream B but never made it to tA nor tC, do we
> need
> > to
> > > > > inject a pair (k,v) of (kB, null) into resulting change log for tC
> ?
> > > > >
> > > > > It sounds like it is definitely necessary if key kB is present in
> > table
> > > > tC
> > > > > but if not, why add it?
> > > > >
> > > > > I have an example that reproduces this and would like to know if it
> > is
> > > > > considered normal, sub-optimal, or a defect. I don't view it as
> > normal
> > > > for
> > > > > time being, particularly considering stream A as having very few
> keys
> > > > and B
> > > > > as having many, which could lead to an unnecessary large change 

Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-07 Thread Guozhang Wang
Hello,

The problem is that, in stream processing when you have a new record coming
from the big table tB and does not find a match with the small table (i.e.
only a sub set of keys in practice) in tA, you cannot tell which case of
the following is true:

1. there was a matching record in tA, which gets deleted before this tB
record arrives. In this case we need to output a null record indicating to
"negate" the previous join result.
2. there was never a matching record in tA. In this case we actually do not
need to output anything.

As I mentioned, the root cause is that tables in stream processing can be
updated while it is on-going. As for Kafka Streams implementation, the if a
null record is received in the downstream operators while there is no
existing record for that key in the materialized view, then it is treated
as a no-op.


Guozhang


On Wed, Jul 6, 2016 at 3:59 AM, Philippe Derome  wrote:

> thanks, I understand that it's not modelled the same way as database joins.
>
> Let's take an example of inner join of very small population set (NBA
> rookies or US senators) with larger table (data on zip codes). Let's assume
> we want to identify the crime rate of zip codes where current senators live
> or the median income of zip codes where NBA rookies currently live. These
> small elite population samples will likely never live in the 50% poorest
> zip codes in US (although exceptionally some might) and NBA rookies will
> not live far from their team home base (Maine, Alaska, Hawaii, North
> Dakota) so many zip codes will not match and are expected to never match.
> So, I don't see that the keys representing such zip codes will become
> eventually consistent.
>
> One can imagine an application that makes case of census data (with many
> zip codes) and interested in many such statistics for several such small
> "elite" populations and then the irrelevant zip codes with null records
> find their ways multiple times in the data pipeline.
>
> I cannot think of a more egregious example where one table has billions of
> keys and the other only a handful that would match but I'd assume that such
> use cases could be natural.
>
> It seems to me that the null keys should be output to represent a record
> deletion in the resulting table, but not a near miss on data selection.
>
> On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang  wrote:
>
> > Hello,
> >
> > The KTable join semantics is not exactly the same with that of a RDBMS.
> You
> > can fine detailed semantics in the web docs (search for Joining Streams):
> >
> >
> >
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
> >
> > In a nutshell, the joiner will be triggered only if both / left / either
> of
> > the joining streams has the matching record with the key of the incoming
> > received record (so the input values of the joiner could not be null /
> can
> > be null for only the other value / can be null on either values, but not
> > both), and otherwise a pair of {join-key, null} is output. We made this
> > design deliberately just to make sure that "table-table joins are
> > eventually consistent". This gives a kind of resilience to late arrival
> of
> > records that a late arrival in either stream can "update" the join
> result.
> >
> >
> > Guozhang
> >
> > On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome 
> > wrote:
> >
> > > Same happens for regular join, keys that appear only in one stream will
> > > make it to output KTable tC with a null for either input stream. I
> guess
> > > it's related to Kafka-3911 Enforce ktable Materialization or umbrella
> > JIRA
> > > 3909, Queryable state for Kafka Streams?
> > >
> > > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome 
> > > wrote:
> > >
> > > > If we have two streams A and B for which we associate tables tA and
> tB,
> > > > then create a table tC as ta.leftJoin(tB, ) and
> then
> > > we
> > > > have a key kB in stream B but never made it to tA nor tC, do we need
> to
> > > > inject a pair (k,v) of (kB, null) into resulting change log for tC ?
> > > >
> > > > It sounds like it is definitely necessary if key kB is present in
> table
> > > tC
> > > > but if not, why add it?
> > > >
> > > > I have an example that reproduces this and would like to know if it
> is
> > > > considered normal, sub-optimal, or a defect. I don't view it as
> normal
> > > for
> > > > time being, particularly considering stream A as having very few keys
> > > and B
> > > > as having many, which could lead to an unnecessary large change log
> for
> > > C.
> > > >
> > > > Phil
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-06 Thread Philippe Derome
thanks, I understand that it's not modelled the same way as database joins.

Let's take an example of inner join of very small population set (NBA
rookies or US senators) with larger table (data on zip codes). Let's assume
we want to identify the crime rate of zip codes where current senators live
or the median income of zip codes where NBA rookies currently live. These
small elite population samples will likely never live in the 50% poorest
zip codes in US (although exceptionally some might) and NBA rookies will
not live far from their team home base (Maine, Alaska, Hawaii, North
Dakota) so many zip codes will not match and are expected to never match.
So, I don't see that the keys representing such zip codes will become
eventually consistent.

One can imagine an application that makes case of census data (with many
zip codes) and interested in many such statistics for several such small
"elite" populations and then the irrelevant zip codes with null records
find their ways multiple times in the data pipeline.

I cannot think of a more egregious example where one table has billions of
keys and the other only a handful that would match but I'd assume that such
use cases could be natural.

It seems to me that the null keys should be output to represent a record
deletion in the resulting table, but not a near miss on data selection.

On Tue, Jul 5, 2016 at 12:44 AM, Guozhang Wang  wrote:

> Hello,
>
> The KTable join semantics is not exactly the same with that of a RDBMS. You
> can fine detailed semantics in the web docs (search for Joining Streams):
>
>
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl
>
> In a nutshell, the joiner will be triggered only if both / left / either of
> the joining streams has the matching record with the key of the incoming
> received record (so the input values of the joiner could not be null / can
> be null for only the other value / can be null on either values, but not
> both), and otherwise a pair of {join-key, null} is output. We made this
> design deliberately just to make sure that "table-table joins are
> eventually consistent". This gives a kind of resilience to late arrival of
> records that a late arrival in either stream can "update" the join result.
>
>
> Guozhang
>
> On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome 
> wrote:
>
> > Same happens for regular join, keys that appear only in one stream will
> > make it to output KTable tC with a null for either input stream. I guess
> > it's related to Kafka-3911 Enforce ktable Materialization or umbrella
> JIRA
> > 3909, Queryable state for Kafka Streams?
> >
> > On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome 
> > wrote:
> >
> > > If we have two streams A and B for which we associate tables tA and tB,
> > > then create a table tC as ta.leftJoin(tB, ) and then
> > we
> > > have a key kB in stream B but never made it to tA nor tC, do we need to
> > > inject a pair (k,v) of (kB, null) into resulting change log for tC ?
> > >
> > > It sounds like it is definitely necessary if key kB is present in table
> > tC
> > > but if not, why add it?
> > >
> > > I have an example that reproduces this and would like to know if it is
> > > considered normal, sub-optimal, or a defect. I don't view it as normal
> > for
> > > time being, particularly considering stream A as having very few keys
> > and B
> > > as having many, which could lead to an unnecessary large change log for
> > C.
> > >
> > > Phil
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-04 Thread Guozhang Wang
Hello,

The KTable join semantics is not exactly the same with that of a RDBMS. You
can fine detailed semantics in the web docs (search for Joining Streams):

http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl

In a nutshell, the joiner will be triggered only if both / left / either of
the joining streams has the matching record with the key of the incoming
received record (so the input values of the joiner could not be null / can
be null for only the other value / can be null on either values, but not
both), and otherwise a pair of {join-key, null} is output. We made this
design deliberately just to make sure that "table-table joins are
eventually consistent". This gives a kind of resilience to late arrival of
records that a late arrival in either stream can "update" the join result.


Guozhang

On Mon, Jul 4, 2016 at 6:10 PM, Philippe Derome  wrote:

> Same happens for regular join, keys that appear only in one stream will
> make it to output KTable tC with a null for either input stream. I guess
> it's related to Kafka-3911 Enforce ktable Materialization or umbrella JIRA
> 3909, Queryable state for Kafka Streams?
>
> On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome 
> wrote:
>
> > If we have two streams A and B for which we associate tables tA and tB,
> > then create a table tC as ta.leftJoin(tB, ) and then
> we
> > have a key kB in stream B but never made it to tA nor tC, do we need to
> > inject a pair (k,v) of (kB, null) into resulting change log for tC ?
> >
> > It sounds like it is definitely necessary if key kB is present in table
> tC
> > but if not, why add it?
> >
> > I have an example that reproduces this and would like to know if it is
> > considered normal, sub-optimal, or a defect. I don't view it as normal
> for
> > time being, particularly considering stream A as having very few keys
> and B
> > as having many, which could lead to an unnecessary large change log for
> C.
> >
> > Phil
> >
>



-- 
-- Guozhang


Re: KStream: KTable-KTable leftJoin with key only on RHS of join generates null in joined table

2016-07-04 Thread Philippe Derome
Same happens for regular join, keys that appear only in one stream will
make it to output KTable tC with a null for either input stream. I guess
it's related to Kafka-3911 Enforce ktable Materialization or umbrella JIRA
3909, Queryable state for Kafka Streams?

On Mon, Jul 4, 2016 at 8:45 PM, Philippe Derome  wrote:

> If we have two streams A and B for which we associate tables tA and tB,
> then create a table tC as ta.leftJoin(tB, ) and then we
> have a key kB in stream B but never made it to tA nor tC, do we need to
> inject a pair (k,v) of (kB, null) into resulting change log for tC ?
>
> It sounds like it is definitely necessary if key kB is present in table tC
> but if not, why add it?
>
> I have an example that reproduces this and would like to know if it is
> considered normal, sub-optimal, or a defect. I don't view it as normal for
> time being, particularly considering stream A as having very few keys and B
> as having many, which could lead to an unnecessary large change log for C.
>
> Phil
>