Yes, it looks very good. Your detailed explanation appears compelling
enough to reveal that some of the details of the complexity of a streams
system are probably inherent complexity (not that I dared assume it was
"easy" but I could afford to be conveniently unaware). It took me 30
minutes to grasp this latest response.

There might be a typo in your email for case 3.c.1) as I would think we
should send the most recent pair as opposed to original, in any event it
does not materially impact your presentation.

Your case 3a) is really what triggered my line of questioning and I found
the current behaviour vexing as it may lead to some undesirable and
necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample at
the very end trying to weed out null) used to output to topic to console.
Without looking at design, it seemed self-evident to me that the 3a)
behaviour had to be implemented ( from my point of view with the code
example I was looking at, it simply means never say to delete a key that
was never created, simply don't "create a deleted" key).

Likewise cases 3 b,c look very reasonable.

Just out of curiosity, did you effectively just restate the essence of
KIP-63 in a more approachable language I could understand or is KIP-63
really a different beast?



On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Philippe,
>
> Very good points, let me dump my thoughts about "KTable.filter"
> specifically and how we can improve on that:
>
> 1. Some context: when a KTable participates in a downstream operators (e.g.
> if that operator is an aggregation), then we need to materialize this
> KTable and send both its old value as well as new value as a pair {old ->
> new} to the downstream operator. In practice it usually needs to send the
> pair.
>
> So let's discuss about them separately, take the following example source
> stream for your KTable
>
> <a: 1>, <b: 2>, <a: 3> ...
>
> When the KTable needs to be materialized, it will transform the source
> messages into the pairs of:
>
> <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}>
>
> 2. If "send old value" is not enabled, then when the filter predicate
> returns false, we MUST send a <key: null> to the downstream operator to
> indicate that this key is being filtered in the table. Otherwise, for
> example if your filter is "value < 2", then the updated value <a: 3> will
> just be filtered, resulting in incorrect semantics.
>
> If it returns true we should still send the original <key: value> to
> downstream operators.
>
> 3. If "send old value" is enabled, then there are a couple of cases we can
> consider:
>
>     a. If old value is <key: null> and new value is <key: not-null>, and
> the filter predicate return false for the new value, then in this case it
> is safe to optimize and not returning anything to the downstream operator,
> since in this case we know there is no value for the key previously
> anyways; otherwise we send the original pair.
>
>     b. If old value is <key: not-null> and new value is <key: null>,
> indicating to delete this key, and the filter predicate return false for
> the old value, then in this case it is safe to optimize and not returning
> anything to the downstream operator, since we know that the old value has
> already been filtered in a previous message; otherwise we send the original
> pair.
>
>     c. If both old and new values are not null, and:
>
>
>   1) predicate return true on both, send the original pair;
>
>   2) predicate return false on both, we can optimize and do not send
> anything;
>
>   3) predicate return true on old and false on new, send the key: {old ->
> null};
>
>   4) predicate return false on old and true on new, send the key: {null ->
> new};
>
> Does this sounds good to you?
>
>
> Guozhang
>
>
> On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome <phder...@gmail.com>
> wrote:
>
> > Thanks a lot for the detailed feedback, its clarity and the reference to
> > KIP-63, which however is for the most part above my head for now.
> >
> > Having said that, I still hold the view that the behaviour I presented is
> > undesirable and hardly defensible and we may have no choice but to agree
> to
> > disagree and it could be a sterile discussion to keep at it and
> addressing
> > KIP-63 and other issues are more important than my brief observation.
> >
> > What follows supports my point of view that the filter method is not
> > behaving as expected and I'd still think it's a defect, however I am
> > guarded with my observation admitting my status of "total newbie" at
> stream
> > processing and Kafka.
> >
> > if we rewrite the code snippet I provided from
> > KTable<String, *String*> regionCounts = userRegions
> >      .groupBy((userId, region) -> KeyValue.pair(region, region))
> >      .count("CountsByRegion")
> >      .filter((regionName, count) -> false)
> >      .mapValues(count -> count.toString());
> >
> > to
> >
> >
> > KTable<String, Long> regionCounts1 = userRegions
> >     .groupBy((userId, region) -> KeyValue.pair(region, region))
> >     .count("CountsByRegion");
> >
> > KTable<String, String> regionCounts = regionCounts1
> >     .filter((regionName, count) -> false)
> >     .mapValues(count -> count.toString());
> >
> >
> > It becomes clear that regionCounts1 could build up plenty of keys with
> > valid Long counts, normal behaviour
> >
> >  (I think you call this a node in the topology in KIP-63 and
> > regionCounts is a successor node).
> >
> > These regionCounts1 keys are then exposed to evaluation of KTable
> > regionCounts as an input. But why should there be any key created in
> > KTable regionCounts that has a false filter? In other words, the
> > "optimization"
> >
> > seems really compelling here: do not create a key before that key
> > becomes relevant. The key with a null value is valid and relevant in
> > regionCounts1 but not regionCounts. By a programming composition
> > argument, the original block
> >
> > of code I presented should be equivalent to the broken down one in two
> > blocks here (and I guess that's saying 1 unified node in the topology
> > should be equivalent to a chain of 2 nodes represented below if I
> > understand the terminology right).
> >
> > The contents of regionCounts should not change depending on the set of
> > keys present in regionCounts1 if we view this
> >
> > from a functional programming point of view (it's as if we are
> > carrying garbage collected objects into regionCounts), which seems
> > natural considering the method filter that is pervasive in FP.
> >
> > Here regionCounts is totally oblivious that aggregation took place
> > previously in regionCounts1 and that's fine (KIP-63 talks much about
> > aggregation but I don't really care about, I care about the 2nd node
> > and the behaviour of filter).
> >
> >
> > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Philippe,
> > >
> > > I think your question is really in two-folds:
> > >
> > > 1. What is the semantic difference between a KTable and a KStream, and
> > more
> > > specifically how should we interpret (key, null) in KTable?
> > >
> > > You can find some explanations in this documentation:
> > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
> > >
> > > Note that KTable itself is still a stream behind the scene, although it
> > may
> > > be materialized when necessary. And specifically to your question,
> (key,
> > > null) can be treated as a tombstone on the specified key, and when this
> > > KTable stream is materialized, it will result in a "delete" on
> > materialized
> > > view.
> > >
> > >
> > > 2. As for the "filter" operator, yes it will generate a large amount of
> > > (key, null) records which indicates "delete" in the resulted KTable,
> and
> > > hence large traffic to the piped topic. But we are working on KIP-63
> > which
> > > unifies the caching mechanism in the `KTable.to` operator as well so
> that
> > > de-duping can be done in this operator and hence the outgoing traffic
> can
> > > be largely reduced:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <phder...@gmail.com>
> > > wrote:
> > >
> > > > I made a modification of latest Confluent's example
> > > > UserRegionLambdaExample. See relevant code at end of email.
> > > >
> > > > Am I correct in understanding that KTable semantics should be similar
> > to
> > > a
> > > > store-backed cache of a view as (per wikipedia on materialized views)
> > or
> > > > similar to Oracle's materialized views and indexed views? More
> > > > specifically, I am looking at when a (key, null value) pair can make
> it
> > > > into KTable on generating table from a valid KStream with a false
> > filter.
> > > >
> > > > Here's relevant code modified from example for which I observed that
> > all
> > > > keys within userRegions are sent out to topic LargeRegions with a
> null
> > > > value. I would think that both regionCounts KTable and topic
> > LargeRegions
> > > > should be empty so that the cached view agrees with the intended
> query
> > (a
> > > > query with an intentional empty result set as the filter is
> > intentionally
> > > > false as 1 >= 2).
> > > >
> > > > I am not sure I understand implications properly as I am new but it
> > seems
> > > > possible that  a highly selective filter from a large incoming stream
> > > would
> > > > result in high memory usage for regionCounts and hence the stream
> > > > application.
> > > >
> > > > KTable<String, *String*> regionCounts = userRegions
> > > >     // Count by region
> > > >     // We do not need to specify any explicit serdes because the key
> > > > and value types do not change
> > > >     .groupBy((userId, region) -> KeyValue.pair(region, region))
> > > >     .count("CountsByRegion")
> > > >     // discard any regions FOR SAKE OF EXAMPLE
> > > >     .filter((regionName, count) -> *1 >= 2*)
> > > >     .mapValues(count -> count.toString());
> > > >
> > > >
> > > > KStream<String, *String*> regionCountsForConsole =
> > > regionCounts.toStream();
> > > >
> > > > regionCountsForConsole.to(stringSerde, *stringSerde*,
> "LargeRegions");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to