Re: Handling of nulls in KTable groupBy
Hello Jeff, Yes, output null upon no-match is by design, as we are trying to intentionally differentiate with the join semantics of an RDBMS, where tables are "static", where as in Kafka Streams "KTables are continuously evolving / being updated". In fact, the semantics of inner / left / outer joins is that: "The ValueJoiner will be triggered when both / left / either of the joining streams have matching on the key upon newly arrived records, otherwise output null as the value". So back to your use case, I think this is actually a bug in KTableRepartitionMap that it actually should expect null grouped keys; this would be a straight-forward fix for this operator, but I can make a pass over all the repartition operators just to make sure they are all gracefully handling null keys. Would you want to file a JIRA? Or I can do it. Guozhang On Wed, Jun 8, 2016 at 6:47 PM, Jeff Klukas wrote: > Doing some more investigation into this, the KTable-KTable inner join is > indeed emitting records on every update of either KTable. If their is no > match found, the record that's emitted is null. This may be a conscious > design decision due to the continuous nature of the join, although I'd love > to hear confirmation or commentary on that. > > Assuming the above is true, I think a KTable-KTable join followed by a > groupBy is simply not possible. > > I discovered a different approach which seems roundabout, but appears to > work. I can convert the joined KTable to a KStream, filter out null values, > and then use a KStream.map operation to change the key (rather than > repartitioning via groupBy). Finally, reduceByKey can get us back to a > KTable: > > table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v > == null).map( ... ).reduceByKey( ... ) > > Is it necessary to convert to a KStream in order to filter out the null > join values? > > -- Forwarded message -- > From: Jeff Klukas > To: users@kafka.apache.org > Cc: Guozhang Wang > Date: Wed, 8 Jun 2016 10:56:26 -0400 > Subject: Handling of nulls in KTable groupBy > I have a seemingly simple case where I want to join two KTables to produce > a new table with a different key, but I am getting NPEs. My understanding > is that to change the key of a KTable, I need to do a groupBy and a reduce. > > What I believe is going on is that the inner join operation is emitting > nulls in the case that no matching record is found in one of the source > KTables. The groupBy operation then receives null inputs that it's not > expecting. > > Here is the snippet of code where I define the join and the groupBy: > > customerIdToAccountIdLookup.join(customerIdToUserIdLookup, > (Integer accountId, String userId) -> { > return new KeyValue<>(accountId, userId); > }) > .groupBy((Integer customerId, KeyValue kv) > -> { > return kv; > }, Serdes.Integer(), Serdes.String()) > > This produces the following exception: > > ! java.lang.NullPointerException: null > ! at > org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$ > KTableMapProcessor.process(KTableRepartitionMap.java:88) > > Am I approaching this incorrectly, or is there a bug going on? Should a > KTable-KTable inner join be emitting records when no match is found? > -- -- Guozhang
Re: Handling of nulls in KTable groupBy
Doing some more investigation into this, the KTable-KTable inner join is indeed emitting records on every update of either KTable. If their is no match found, the record that's emitted is null. This may be a conscious design decision due to the continuous nature of the join, although I'd love to hear confirmation or commentary on that. Assuming the above is true, I think a KTable-KTable join followed by a groupBy is simply not possible. I discovered a different approach which seems roundabout, but appears to work. I can convert the joined KTable to a KStream, filter out null values, and then use a KStream.map operation to change the key (rather than repartitioning via groupBy). Finally, reduceByKey can get us back to a KTable: table1.join(table2, joiner).toStream().filterNot((k, v) -> k == null || v == null).map( ... ).reduceByKey( ... ) Is it necessary to convert to a KStream in order to filter out the null join values? -- Forwarded message -- From: Jeff Klukas To: users@kafka.apache.org Cc: Guozhang Wang Date: Wed, 8 Jun 2016 10:56:26 -0400 Subject: Handling of nulls in KTable groupBy I have a seemingly simple case where I want to join two KTables to produce a new table with a different key, but I am getting NPEs. My understanding is that to change the key of a KTable, I need to do a groupBy and a reduce. What I believe is going on is that the inner join operation is emitting nulls in the case that no matching record is found in one of the source KTables. The groupBy operation then receives null inputs that it's not expecting. Here is the snippet of code where I define the join and the groupBy: customerIdToAccountIdLookup.join(customerIdToUserIdLookup, (Integer accountId, String userId) -> { return new KeyValue<>(accountId, userId); }) .groupBy((Integer customerId, KeyValue kv) -> { return kv; }, Serdes.Integer(), Serdes.String()) This produces the following exception: ! java.lang.NullPointerException: null ! at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$ KTableMapProcessor.process(KTableRepartitionMap.java:88) Am I approaching this incorrectly, or is there a bug going on? Should a KTable-KTable inner join be emitting records when no match is found?
Handling of nulls in KTable groupBy
I have a seemingly simple case where I want to join two KTables to produce a new table with a different key, but I am getting NPEs. My understanding is that to change the key of a KTable, I need to do a groupBy and a reduce. What I believe is going on is that the inner join operation is emitting nulls in the case that no matching record is found in one of the source KTables. The groupBy operation then receives null inputs that it's not expecting. Here is the snippet of code where I define the join and the groupBy: customerIdToAccountIdLookup.join(customerIdToUserIdLookup, (Integer accountId, String userId) -> { return new KeyValue<>(accountId, userId); }) .groupBy((Integer customerId, KeyValue kv) -> { return kv; }, Serdes.Integer(), Serdes.String()) This produces the following exception: ! java.lang.NullPointerException: null ! at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:88) Am I approaching this incorrectly, or is there a bug going on? Should a KTable-KTable inner join be emitting records when no match is found?