[ 
https://issues.apache.org/jira/browse/KAFKA-3902?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15359841#comment-15359841
 ] 

ASF GitHub Bot commented on KAFKA-3902:
---------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/kafka/pull/1556


> Optimize KTable.filter() to reduce unnecessary traffic
> ------------------------------------------------------
>
>                 Key: KAFKA-3902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3902
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Phil Derome
>              Labels: architecture, performance
>             Fix For: 0.10.0.1
>
>
> {{KTable.filter()}} operator is implemented in {{KTableFilter}}, and can be 
> optimized to reduce unnecessary data traffic to downstream operators. More 
> specifically:
> 1. Some context: when a KTable participates in a downstream operators (e.g. 
> if that operator is an aggregation), then we need to materialize this KTable 
> and send both its old value as well as new value as a pair {old -> new} to 
> the downstream operator. In practice it usually needs to send the pair. 
> So let's discuss about them separately, take the following example source 
> stream for your KTable
> {{<a: 1>, <b: 2>, <a: 3> ...}}
> When the KTable needs to be materialized, it will transform the source 
> messages into the pairs of:
> {{<a: \{null -> 1\}>, <b: \{nul -> 2\}>, <a: \{1 -> 3\}>}}
> 2. If "send old value" is not enabled, then when the filter predicate returns 
> false, we MUST send a <key: null> to the downstream operator to indicate that 
> this key is being filtered in the table. Otherwise, for example if your 
> filter is "value < 2", then the updated value <a: 3> will just be filtered, 
> resulting in incorrect semantics.
> If it returns true we should still send the original <key: value> to 
> downstream operators.
> 3. If "send old value" is enabled, then there are a couple of cases we can 
> consider:
>     a. If old value is <key: null> and new value is <key: not-null>, and the 
> filter predicate return false for the new value, then in this case it is safe 
> to optimize and not returning anything to the downstream operator, since in 
> this case we know there is no value for the key previously anyways; otherwise 
> we send the original pair.
>     b. If old value is <key: not-null> and new value is <key: null>, 
> indicating to delete this key, and the filter predicate return false for the 
> old value, then in this case it is safe to optimize and not returning 
> anything to the downstream operator, since we know that the old value has 
> already been filtered in a previous message; otherwise we send the original 
> pair.
>     c. If both old and new values are not null, and:
>         1) predicate return true on both, send the original pair;
>         2) predicate return false on both, we can optimize and do not send 
> anything;
>         3) predicate return true on old and false on new, send the key: \{old 
> -> null\};
>         4) predicate return false on old and true on new, send the key: 
> \{null -> new\};



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to