[
https://issues.apache.org/jira/browse/KAFKA-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15095499#comment-15095499
]
ASF GitHub Bot commented on KAFKA-3081:
---------------------------------------
GitHub user guozhangwang opened a pull request:
https://github.com/apache/kafka/pull/761
KAFKA-3081: Non-windowed Table Aggregation
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/guozhangwang/kafka K3081
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/761.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #761
----
commit 44a78b023a437690c4ef54b26ce3183bef002876
Author: Guozhang Wang <[email protected]>
Date: 2016-01-11T01:14:27Z
add more serdes
commit 1a3f1fe2b15f5d4e291f694073d7474461b9f35d
Author: Guozhang Wang <[email protected]>
Date: 2016-01-12T22:00:36Z
Merge branch 'trunk' of https://git-wip-us.apache.org/repos/asf/kafka into
K3081
commit 84160d0e2874c9bfba10a8dc59a3e44aac423147
Author: Guozhang Wang <[email protected]>
Date: 2016-01-13T01:07:34Z
add table aggregation
commit e08a67867e0de330731914bacd1abc23aa0f3f20
Author: Guozhang Wang <[email protected]>
Date: 2016-01-13T02:34:10Z
add unit test
commit c56b83b61581fa88bb7afbd893baf4adc6542cfe
Author: Guozhang Wang <[email protected]>
Date: 2016-01-13T02:37:54Z
checkstyle fixes
----
> KTable Aggregation Implementation
> ---------------------------------
>
> Key: KAFKA-3081
> URL: https://issues.apache.org/jira/browse/KAFKA-3081
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
> Fix For: 0.9.1.0
>
>
> We need to add the implementation of the KTable aggregation operation. We
> will translate it into two stages in the underlying topology:
> Stage One:
> 1. No stores attached.
> 2. When receiving the record <K, Change<V>> from the upstream processor, call
> selector.apply on both Change<V>.newValue and Change<V>.oldValue.
> 3. Forward the resulted two messages to an intermediate topic (no compaction)
> with key <agg-key> and value <selected-value, isAdd> where isAdd is a boolean.
> Stage Two:
> 1. Add a K-V store with format <agg-key> : <agg-value> with <K1> ser-de and
> <T> ser-de.
> 2. Upon consuming a record from the intermediate topic:
> 2.1. First try fetch from the store, if not exist call initialValue().
> 2.2. Based on "isAdd" determine to call add(..) or remove(..).
> 2.3. Forward the aggregate value periodically based on the emit duration to
> the sink node with the intermediate topic with key <agg-key> and value
> Change<agg-value>.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)