I am using kafka_2.10-0.10.0.1.
Say I am having a window of 60 minutes advanced by 15 minutes.
If the stream app using timestamp extractor puts the message in one or more
bucket(s), it will get aggregated in those buckets.
I assume this statement is correct.

Also say when I restart the streams application then bucket aggregation
will resume from last point of halt.
I hope this is also correct.

What I noticed that once a message is placed in one bucket, that bucket was
not getting new messages.

However when I ran a small test case replicating that, it is working
properly. There maybe some issues in application reset.

Thanks
Sachin



On Fri, Nov 18, 2016 at 11:30 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Sachin,
>
> Which version of Kafka are you using for this application?
>
>
> Guozhang
>
>
> On Tue, Nov 15, 2016 at 9:52 AM, Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > I have a simple pipeline
> > stream.aggregateByKey(new Initializer<List>() {
> >     public List apply() {
> >     return new List
> >     }
> > }, new Aggregator<Key, Value, List>() {
> >     public List apply(key, value, list) {
> >     list.add(value)
> >     return list
> >     }
> > }, keysSerde, valuesSerde, "table")
> >
> > So this basically aggregates list of values by some key of a source
> stream.
> > This is working fine.
> >
> > However over time the list will grow very big, so I thought of using
> > windowed table.
> >
> > stream.aggregateByKey(new Initializer<List>() {
> >     public List apply() {
> >     return new List
> >     }
> > }, new Aggregator<Key, Value, List>() {
> >     public List apply(key, value, list) {
> >     list.add(value)
> >     return list
> >     }
> > }, TimeWindows.of("table", sizeTS).advanceBy(advanceTS), keysSerde,
> > valuesSerde)
> >
> > It is basically the above code, but what I find is that it aggregates
> only
> > one value for a given windowed key.
> > So size of list is always one.
> >
> > What I understood is that it will put the source values in a time bucket
> > based on their timestamp extractor. When i check the timed window I see
> > that value's timestamp between the bounds of time window.
> >
> > However I have not understood that why it is aggregating only a single
> > value always.
> >
> > So to downstream I always get something like
> >
> > (key, start, end) -> [value1]
> > (key, start, end) -> [value2]
> > and not
> > (key, start, end) -> [value1, value2]
> > note both value1 and value2 are between the start and end bonds.
> >
> > However in first case I get this
> > key -> [value1, value2] which is what I expect.
> >
> > So please let me know if I am missing something in my windowed
> aggregation.
> >
> > Or if there is something else to be done to get the output I want.
> >
> > Thanks
> > Sachin
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to