Seems straightforward enough: I have a 'foreach' after my windowed aggregation and I see values like these come out:
(window) start: 1480444200000 end: 1480445400000 (record) epoch='1480433282000' If I have a 20 minute window with a 1 minute 'step' I will see my record come out of the aggregation 20x - with different window start/end. On Tue, Nov 29, 2016 at 11:06 AM, Eno Thereska <eno.there...@gmail.com> wrote: > Let us know if we can help with that, what problems are you seeing with > records in wrong windows? > > Eno > > > On 29 Nov 2016, at 19:02, Jon Yeargers <jon.yearg...@cedexis.com> wrote: > > > > I've been having problems with records appearing in windows that they > > clearly don't belong to. Was curious whether this was related but it > seems > > not. Bummer. > > > > On Tue, Nov 29, 2016 at 8:52 AM, Eno Thereska <eno.there...@gmail.com> > > wrote: > > > >> Hi Jon, > >> > >> There is an optimization in org.apache.kafka.streams.kstream.internals. > >> WindowedSerializer/Deserializer where we don't encode and decode the > end > >> of the window since the user can always calculate it. So instead we > return > >> a default of Long.MAX_VALUE, which is the big number you see. > >> > >> In other words, use window().start() but not window().end() in this > case. > >> If you want to print both, just add the window size to window().start(). > >> > >> Thanks > >> Eno > >>> On 29 Nov 2016, at 16:17, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > >>> > >>> Using the following topology: > >>> > >>> KStream<String,SumRecord> kStream = > >>> kStreamBuilder.stream(stringSerde,transSerde,TOPIC); > >>> KTable<Windowed<String>, SumRecordCollector> ktAgg = > >>> kStream.groupByKey().aggregate( > >>> SumRecordCollector::new, > >>> new Aggregate(), > >>> TimeWindows.of(20 * 60 * 1000).advanceBy(60 * 1000), > >>> cSerde, "table_stream"); > >>> > >>> > >>> When looking at windows as follows: > >>> > >>> ktAgg.toStream().foreach((postKey, postValue) -> { > >>> LOGGER.debug("start: {} end: {}", > >>> postkey.window().start(), postkey.window().end()); > >>> } > >>> > >>> The 'start' values are coming through properly incremented but the > 'end' > >>> values are all 9223372036854775807. > >>> > >>> Is there something wrong with my topology? Some other bug that would > >> cause > >>> this? > >> > >> > >