Thanks a lot for the detailed feedback, its clarity and the reference to
KIP-63, which however is for the most part above my head for now.

Having said that, I still hold the view that the behaviour I presented is
undesirable and hardly defensible and we may have no choice but to agree to
disagree and it could be a sterile discussion to keep at it and addressing
KIP-63 and other issues are more important than my brief observation.

What follows supports my point of view that the filter method is not
behaving as expected and I'd still think it's a defect, however I am
guarded with my observation admitting my status of "total newbie" at stream
processing and Kafka.

if we rewrite the code snippet I provided from
KTable<String, *String*> regionCounts = userRegions
     .groupBy((userId, region) -> KeyValue.pair(region, region))
     .count("CountsByRegion")
     .filter((regionName, count) -> false)
     .mapValues(count -> count.toString());

to


KTable<String, Long> regionCounts1 = userRegions
    .groupBy((userId, region) -> KeyValue.pair(region, region))
    .count("CountsByRegion");

KTable<String, String> regionCounts = regionCounts1
    .filter((regionName, count) -> false)
    .mapValues(count -> count.toString());


It becomes clear that regionCounts1 could build up plenty of keys with
valid Long counts, normal behaviour

 (I think you call this a node in the topology in KIP-63 and
regionCounts is a successor node).

These regionCounts1 keys are then exposed to evaluation of KTable
regionCounts as an input. But why should there be any key created in
KTable regionCounts that has a false filter? In other words, the
"optimization"

seems really compelling here: do not create a key before that key
becomes relevant. The key with a null value is valid and relevant in
regionCounts1 but not regionCounts. By a programming composition
argument, the original block

of code I presented should be equivalent to the broken down one in two
blocks here (and I guess that's saying 1 unified node in the topology
should be equivalent to a chain of 2 nodes represented below if I
understand the terminology right).

The contents of regionCounts should not change depending on the set of
keys present in regionCounts1 if we view this

from a functional programming point of view (it's as if we are
carrying garbage collected objects into regionCounts), which seems
natural considering the method filter that is pervasive in FP.

Here regionCounts is totally oblivious that aggregation took place
previously in regionCounts1 and that's fine (KIP-63 talks much about
aggregation but I don't really care about, I care about the 2nd node
and the behaviour of filter).


On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Philippe,
>
> I think your question is really in two-folds:
>
> 1. What is the semantic difference between a KTable and a KStream, and more
> specifically how should we interpret (key, null) in KTable?
>
> You can find some explanations in this documentation:
>
> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
>
> Note that KTable itself is still a stream behind the scene, although it may
> be materialized when necessary. And specifically to your question, (key,
> null) can be treated as a tombstone on the specified key, and when this
> KTable stream is materialized, it will result in a "delete" on materialized
> view.
>
>
> 2. As for the "filter" operator, yes it will generate a large amount of
> (key, null) records which indicates "delete" in the resulted KTable, and
> hence large traffic to the piped topic. But we are working on KIP-63 which
> unifies the caching mechanism in the `KTable.to` operator as well so that
> de-duping can be done in this operator and hence the outgoing traffic can
> be largely reduced:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
>
>
> Guozhang
>
>
> On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <phder...@gmail.com>
> wrote:
>
> > I made a modification of latest Confluent's example
> > UserRegionLambdaExample. See relevant code at end of email.
> >
> > Am I correct in understanding that KTable semantics should be similar to
> a
> > store-backed cache of a view as (per wikipedia on materialized views) or
> > similar to Oracle's materialized views and indexed views? More
> > specifically, I am looking at when a (key, null value) pair can make it
> > into KTable on generating table from a valid KStream with a false filter.
> >
> > Here's relevant code modified from example for which I observed that all
> > keys within userRegions are sent out to topic LargeRegions with a null
> > value. I would think that both regionCounts KTable and topic LargeRegions
> > should be empty so that the cached view agrees with the intended query (a
> > query with an intentional empty result set as the filter is intentionally
> > false as 1 >= 2).
> >
> > I am not sure I understand implications properly as I am new but it seems
> > possible that  a highly selective filter from a large incoming stream
> would
> > result in high memory usage for regionCounts and hence the stream
> > application.
> >
> > KTable<String, *String*> regionCounts = userRegions
> >     // Count by region
> >     // We do not need to specify any explicit serdes because the key
> > and value types do not change
> >     .groupBy((userId, region) -> KeyValue.pair(region, region))
> >     .count("CountsByRegion")
> >     // discard any regions FOR SAKE OF EXAMPLE
> >     .filter((regionName, count) -> *1 >= 2*)
> >     .mapValues(count -> count.toString());
> >
> >
> > KStream<String, *String*> regionCountsForConsole =
> regionCounts.toStream();
> >
> > regionCountsForConsole.to(stringSerde, *stringSerde*, "LargeRegions");
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to