Phil Derome created KAFKA-3891:
----------------------------------
Summary: A KTable with Long values with a numeric filter
apparently may retain null values
Key: KAFKA-3891
URL: https://issues.apache.org/jira/browse/KAFKA-3891
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 0.10.0.0
Reporter: Phil Derome
Assignee: Guozhang Wang
Priority: Minor
See Confluent's UserRegionLambdaExample for full detail. Not sure if this
qualifies as a bug as I am new to community, but to me it looks like a bug
(resolved KAFKA-739 and KAFKA-2026 also pertain to undesirable nulls and they
were deemed Major Bugs).
The first filter on KTable for count below should filter correctly for null
since null does not satisfy predicate count >= 2.
Variable regionCounts apparently contain some null values despite the filter on
count given the second filter that takes place. It's quite confusing. Why would
we want to publish these null values on any topic given the filter's intent
should be quite clear?
// Aggregate the user counts of by region
KTable<String, Long> regionCounts = userRegions
// Count by region
// We do not need to specify any explict serdes because the key and
value types do not change
.groupBy((userId, region) -> KeyValue.pair(region, region))
.count("CountsByRegion")
// discard any regions with only 1 user
.filter((regionName, count) -> count >= 2);
// Note: The following operations would NOT be needed for the actual
users-per-region
// computation, which would normally stop at the filter() above. We use
the operations
// below only to "massage" the output data so it is easier to inspect on
the console via
// kafka-console-consumer.
//
KStream<String, Long> regionCountsForConsole = regionCounts
// get rid of windows (and the underlying KTable) by transforming the
KTable to a KStream
.toStream()
// sanitize the output by removing null record values (again, we do
this only so that the
// output is easier to read via kafka-console-consumer combined with
LongDeserializer
// because LongDeserializer fails on null values, and even though we
could configure
// kafka-console-consumer to skip messages on error the output still
wouldn't look pretty)
.filter((regionName, count) -> count != null);
// write to the result topic, we need to override the value serializer to
for type long
regionCountsForConsole.to(stringSerde, longSerde, "LargeRegions");
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)