Hello Sachin, Which version of Kafka are you using for this application?
Guozhang On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I have a simple pipeline > stream.aggregateByKey(new Initializer<List>() { > public List apply() { > return new List > } > }, new Aggregator<Key, Value, List>() { > public List apply(key, value, list) { > list.add(value) > return list > } > }, keysSerde, valuesSerde, "table") > > So this basically aggregates list of values by some key of a source stream. > This is working fine. > > However over time the list will grow very big, so I thought of using > windowed table. > > stream.aggregateByKey(new Initializer<List>() { > public List apply() { > return new List > } > }, new Aggregator<Key, Value, List>() { > public List apply(key, value, list) { > list.add(value) > return list > } > }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde, > valuesSerde) > > It is basically the above code, but what I find is that it aggregates only > one value for a given windowed key. > So size of list is always one. > > What I understood is that it will put the source values in a time bucket > based on their timestamp extractor. When i check the timed window I see > that value's timestamp between the bounds of time window. > > However I have not understood that why it is aggregating only a single > value always. > > So to downstream I always get something like > > (key, start, end) -> [value1] > (key, start, end) -> [value2] > and not > (key, start, end) -> [value1, value2] > note both value1 and value2 are between the start and end bonds. > > However in first case I get this > key -> [value1, value2] which is what I expect. > > So please let me know if I am missing something in my windowed aggregation. > > Or if there is something else to be done to get the output I want. > > Thanks > Sachin > -- -- Guozhang