Phil Derome created KAFKA-3891:
----------------------------------

             Summary: A KTable with Long values with a numeric filter 
apparently may retain null values
                 Key: KAFKA-3891
                 URL: https://issues.apache.org/jira/browse/KAFKA-3891
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 0.10.0.0
            Reporter: Phil Derome
            Assignee: Guozhang Wang
            Priority: Minor


See Confluent's UserRegionLambdaExample for full detail. Not sure if this 
qualifies as a bug as I am new to community, but to me it looks like a bug 
(resolved KAFKA-739 and KAFKA-2026 also pertain to undesirable nulls and they 
were deemed Major Bugs).

The first filter on KTable for count below should filter correctly for null 
since null does not satisfy predicate count >= 2.

Variable regionCounts apparently contain some null values despite the filter on 
count given the second filter that takes place. It's quite confusing. Why would 
we want to publish these null values on any topic given the filter's intent 
should be quite clear?

  // Aggregate the user counts of by region
    KTable<String, Long> regionCounts = userRegions
        // Count by region
        // We do not need to specify any explict serdes because the key and 
value types do not change
        .groupBy((userId, region) -> KeyValue.pair(region, region))
        .count("CountsByRegion")
        // discard any regions with only 1 user
        .filter((regionName, count) -> count >= 2);

    // Note: The following operations would NOT be needed for the actual 
users-per-region
    // computation, which would normally stop at the filter() above.  We use 
the operations
    // below only to "massage" the output data so it is easier to inspect on 
the console via
    // kafka-console-consumer.
    //
    KStream<String, Long> regionCountsForConsole = regionCounts
        // get rid of windows (and the underlying KTable) by transforming the 
KTable to a KStream
        .toStream()
        // sanitize the output by removing null record values (again, we do 
this only so that the
        // output is easier to read via kafka-console-consumer combined with 
LongDeserializer
        // because LongDeserializer fails on null values, and even though we 
could configure
        // kafka-console-consumer to skip messages on error the output still 
wouldn't look pretty)
        .filter((regionName, count) -> count != null);

    // write to the result topic, we need to override the value serializer to 
for type long
    regionCountsForConsole.to(stringSerde, longSerde, "LargeRegions");




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to