Re: Handling of nulls in KTable groupBy

2016-06-09 Thread Guozhang Wang
Hello Jeff,

Yes, output null upon no-match is by design, as we are trying to
intentionally differentiate with the join semantics of an RDBMS, where
tables are "static", where as in Kafka Streams "KTables are continuously
 evolving / being updated". In fact, the semantics of inner / left / outer
joins is that: "The ValueJoiner will be triggered when both / left / either
of the joining streams have matching on the key upon newly arrived records,
otherwise output null as the value".

So back to your use case, I think this is actually a bug in
KTableRepartitionMap
that it actually should expect null grouped keys; this would be a
straight-forward fix for this operator, but I can make a pass over all the
repartition operators just to make sure they are all gracefully handling
null keys.

Would you want to file a JIRA? Or I can do it.

Guozhang

On Wed, Jun 8, 2016 at 6:47 PM, Jeff Klukas  wrote:

> Doing some more investigation into this, the KTable-KTable inner join is
> indeed emitting records on every update of either KTable. If their is no
> match found, the record that's emitted is null. This may be a conscious
> design decision due to the continuous nature of the join, although I'd love
> to hear confirmation or commentary on that.
>
> Assuming the above is true, I think a KTable-KTable join followed by a
> groupBy is simply not possible.
>
> I discovered a different approach which seems roundabout, but appears to
> work. I can convert the joined KTable to a KStream, filter out null values,
> and then use a KStream.map operation to change the key (rather than
> repartitioning via groupBy). Finally, reduceByKey can get us back to a
> KTable:
>
> table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v
> == null).map( ... ).reduceByKey( ... )
>
> Is it necessary to convert to a KStream in order to filter out the null
> join values?
>
> -- Forwarded message --
> From: Jeff Klukas 
> To: users@kafka.apache.org
> Cc: Guozhang Wang 
> Date: Wed, 8 Jun 2016 10:56:26 -0400
> Subject: Handling of nulls in KTable groupBy
> I have a seemingly simple case where I want to join two KTables to produce
> a new table with a different key, but I am getting NPEs. My understanding
> is that to change the key of a KTable, I need to do a groupBy and a reduce.
>
> What I believe is going on is that the inner join operation is emitting
> nulls in the case that no matching record is found in one of the source
> KTables. The groupBy operation then receives null inputs that it's not
> expecting.
>
> Here is the snippet of code where I define the join and the groupBy:
>
> customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
> (Integer accountId, String userId) -> {
> return new KeyValue<>(accountId, userId);
> })
> .groupBy((Integer customerId, KeyValue kv)
> -> {
> return kv;
> }, Serdes.Integer(), Serdes.String())
>
> This produces the following exception:
>
> ! java.lang.NullPointerException: null
> ! at
> org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
> KTableMapProcessor.process(KTableRepartitionMap.java:88)
>
> Am I approaching this incorrectly, or is there a bug going on? Should a
> KTable-KTable inner join be emitting records when no match is found?
>



-- 
-- Guozhang


Re: Handling of nulls in KTable groupBy

2016-06-08 Thread Jeff Klukas
Doing some more investigation into this, the KTable-KTable inner join is
indeed emitting records on every update of either KTable. If their is no
match found, the record that's emitted is null. This may be a conscious
design decision due to the continuous nature of the join, although I'd love
to hear confirmation or commentary on that.

Assuming the above is true, I think a KTable-KTable join followed by a
groupBy is simply not possible.

I discovered a different approach which seems roundabout, but appears to
work. I can convert the joined KTable to a KStream, filter out null values,
and then use a KStream.map operation to change the key (rather than
repartitioning via groupBy). Finally, reduceByKey can get us back to a
KTable:

table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v
== null).map( ... ).reduceByKey( ... )

Is it necessary to convert to a KStream in order to filter out the null
join values?

-- Forwarded message --
From: Jeff Klukas 
To: users@kafka.apache.org
Cc: Guozhang Wang 
Date: Wed, 8 Jun 2016 10:56:26 -0400
Subject: Handling of nulls in KTable groupBy
I have a seemingly simple case where I want to join two KTables to produce
a new table with a different key, but I am getting NPEs. My understanding
is that to change the key of a KTable, I need to do a groupBy and a reduce.

What I believe is going on is that the inner join operation is emitting
nulls in the case that no matching record is found in one of the source
KTables. The groupBy operation then receives null inputs that it's not
expecting.

Here is the snippet of code where I define the join and the groupBy:

customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
(Integer accountId, String userId) -> {
return new KeyValue<>(accountId, userId);
})
.groupBy((Integer customerId, KeyValue kv)
-> {
return kv;
}, Serdes.Integer(), Serdes.String())

This produces the following exception:

! java.lang.NullPointerException: null
! at
org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$
KTableMapProcessor.process(KTableRepartitionMap.java:88)

Am I approaching this incorrectly, or is there a bug going on? Should a
KTable-KTable inner join be emitting records when no match is found?


Handling of nulls in KTable groupBy

2016-06-08 Thread Jeff Klukas
I have a seemingly simple case where I want to join two KTables to produce
a new table with a different key, but I am getting NPEs. My understanding
is that to change the key of a KTable, I need to do a groupBy and a reduce.

What I believe is going on is that the inner join operation is emitting
nulls in the case that no matching record is found in one of the source
KTables. The groupBy operation then receives null inputs that it's not
expecting.

Here is the snippet of code where I define the join and the groupBy:

customerIdToAccountIdLookup.join(customerIdToUserIdLookup,
(Integer accountId, String userId) -> {
return new KeyValue<>(accountId, userId);
})
.groupBy((Integer customerId, KeyValue kv)
-> {
return kv;
}, Serdes.Integer(), Serdes.String())

This produces the following exception:

! java.lang.NullPointerException: null
! at
org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:88)

Am I approaching this incorrectly, or is there a bug going on? Should a
KTable-KTable inner join be emitting records when no match is found?