Hi Marco, Can you try setting StreamsConfig.CACHE_MAX_BYTES_BUFFER_CONFIG to 0 and see if that resolves the issue?
Thanks, Damian On Mon, 6 Mar 2017 at 10:59 Damian Guy <damian....@gmail.com> wrote: > Hi Marco, > > Your config etc look ok. > > 1. It is pretty hard to tell what is going on from just your code below, > unfortunately. But the behaviour doesn't seem to be inline with what I'm > reading in the streams code. For example your MySession::new function > should be called once per record. The merger and aggregator should be > called pretty much immediately after that. > > 2. Data will be retained for a bit longer than the value used in > SessionWindows.until(..). The session store has 3 segments and we use the > retention period (i.e., value of until()) to determine the segment length. > The segment length is calculated as: > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > Which in this case is 210000 milliseconds. So maintaining 3 segments means > there could be data that is about 10 minutes old. > > Also this is completely driven by the data and specifically the time > extracted from the data. I'm not sure if you can provide a sample of the > data going through the system? It might be helpful in trying to debug the > issue. (I'm not seeing anything obvious in the code). > Also it might help if you can get some stack traces on the streams > instances that appear to be stuck. > > Thanks, > Damian > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com> > wrote: > > Hello, > > I'm playing around with the brand new SessionWindows. I have a simple > topology such as: > > KStream<String, JsonObject> sess = > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > sess > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > .groupByKey(stringSerde, jsonSerde) > .aggregate( > MySession::new, > MySession::aggregateSessions, > MySession::mergeSessions, > SessionWindows > .with(WINDOW_INACTIVITY_GAPS_MS) > .until(WINDOW_MAINTAIN_DURATION_MS), > .filter(MySession::filterOutZeroLenghtSessions) > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > these are the most important configuration I'm using, all the other configs > are the classical serdes and hosts props: > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > 2_MINUTES; > > private static final Properties props = new Properties(); > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, > ONE_DAY); > > The source stream has data arriving at around 100 messages/second > > I'm experiencing this behaviours: > > 1) MySession::new is called thousands of times, way way more of the number > of messages ingested (around 100 / 1000 times more) the most of this > sessions never reach the end of the pipeline (even if I remove > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > MySession::aggregateSessions > and MySession::mergeSessions are invoked. > > Is this correct? I don't understand, maybe I've setup something wrong... > > 2) I can see that the stream pipeline can ingest the first 15 minutes of > data and sessions that reach SINK_TOPIC_KTABLE looks good. However: > - every second that passes the pipeline gets slower and slower and > - I can see new updates to old sessions also after > .until(WINDOW_MAINTAIN_DURATION_MS) > period. > - the stream consumer starts to ingest new data with slower and slower > rates as time passes, eventually reaching almost 0msg/sec > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new > sessions and those that have been fired, will just be removed from session > store and never touched again. > > > At the beginning I was thinking that my pipeline was not setup correctly, > however I've tried to follow slavishly the docs and I could not find where > things can go wrong. > > Do you have some hints about this? > Please let me know if you need more info about. > > thanks a lot, > Marco > >