Hello Sachin,

Which version of Kafka are you using for this application?


Guozhang


On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I have a simple pipeline
> stream.aggregateByKey(new Initializer<List>() {
>     public List apply() {
>     return new List
>     }
> }, new Aggregator<Key, Value, List>() {
>     public List apply(key, value, list) {
>     list.add(value)
>     return list
>     }
> }, keysSerde, valuesSerde, "table")
>
> So this basically aggregates list of values by some key of a source stream.
> This is working fine.
>
> However over time the list will grow very big, so I thought of using
> windowed table.
>
> stream.aggregateByKey(new Initializer<List>() {
>     public List apply() {
>     return new List
>     }
> }, new Aggregator<Key, Value, List>() {
>     public List apply(key, value, list) {
>     list.add(value)
>     return list
>     }
> }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
> valuesSerde)
>
> It is basically the above code, but what I find is that it aggregates only
> one value for a given windowed key.
> So size of list is always one.
>
> What I understood is that it will put the source values in a time bucket
> based on their timestamp extractor. When i check the timed window I see
> that value's timestamp between the bounds of time window.
>
> However I have not understood that why it is aggregating only a single
> value always.
>
> So to downstream I always get something like
>
> (key, start, end) -> [value1]
> (key, start, end) -> [value2]
> and not
> (key, start, end) -> [value1, value2]
> note both value1 and value2 are between the start and end bonds.
>
> However in first case I get this
> key -> [value1, value2] which is what I expect.
>
> So please let me know if I am missing something in my windowed aggregation.
>
> Or if there is something else to be done to get the output I want.
>
> Thanks
> Sachin
>



-- 
-- Guozhang

Reply via email to