Seems straightforward enough: I have a 'foreach' after my windowed
aggregation and I see values like these come out:

(window) start: 1480444200000  end: 1480445400000

 (record) epoch='1480433282000'


If I have a 20 minute window with a 1 minute 'step' I will see my record
come out of the aggregation 20x - with different window start/end.

On Tue, Nov 29, 2016 at 11:06 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Let us know if we can help with that, what problems are you seeing with
> records in wrong windows?
>
> Eno
>
> > On 29 Nov 2016, at 19:02, Jon Yeargers <jon.yearg...@cedexis.com> wrote:
> >
> > I've been having problems with records appearing in windows that they
> > clearly don't belong to. Was curious whether this was related but it
> seems
> > not. Bummer.
> >
> > On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <eno.there...@gmail.com>
> > wrote:
> >
> >> Hi Jon,
> >>
> >> There is an optimization in org.apache.kafka.streams.kstream.internals.
> >> WindowedSerializer/Deserializer where we don't encode and decode the
> end
> >> of the window since the user can always calculate it. So instead we
> return
> >> a default of Long.MAX_VALUE, which is the big number you see.
> >>
> >> In other words, use window().start() but not window().end() in this
> case.
> >> If you want to print both, just add the window size to window().start().
> >>
> >> Thanks
> >> Eno
> >>> On 29 Nov 2016, at 16:17, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
> >>>
> >>> Using the following topology:
> >>>
> >>> KStream<String,SumRecord> kStream =
> >>> kStreamBuilder.stream(stringSerde,transSerde,TOPIC);
> >>>       KTable<Windowed<String>, SumRecordCollector> ktAgg =
> >>> kStream.groupByKey().aggregate(
> >>>               SumRecordCollector::new,
> >>>               new Aggregate(),
> >>>               TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000),
> >>>               cSerde, "table_stream");
> >>>
> >>>
> >>> When looking at windows as follows:
> >>>
> >>>          ktAgg.toStream().foreach((postKey, postValue) -> {
> >>>                LOGGER.debug("start: {}   end: {}",
> >>> postkey.window().start(), postkey.window().end());
> >>>          }
> >>>
> >>> The 'start' values are coming through properly incremented but the
> 'end'
> >>> values are all 9223372036854775807.
> >>>
> >>> Is there something wrong with my topology? Some other bug that would
> >> cause
> >>> this?
> >>
> >>
>
>

Reply via email to