Hello,

I'm playing around with the brand new SessionWindows. I have a simple
topology such as:

KStream<String, JsonObject> sess =
 builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
sess
    .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
    .groupByKey(stringSerde, jsonSerde)
    .aggregate(
        MySession::new,
        MySession::aggregateSessions,
        MySession::mergeSessions,
        SessionWindows
            .with(WINDOW_INACTIVITY_GAPS_MS)
            .until(WINDOW_MAINTAIN_DURATION_MS),
    .filter(MySession::filterOutZeroLenghtSessions)
    .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);

these are the most important configuration I'm using, all the other configs
are the classical serdes and hosts props:

private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
2_MINUTES;

private static final Properties props = new Properties();
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
ONE_DAY);

The source stream has data arriving at around 100 messages/second

I'm experiencing this behaviours:

1) MySession::new is called thousands of times, way way more of the number
of messages ingested (around 100 / 1000 times more) the most of this
sessions never reach the end of the pipeline (even if I remove
.filter(MySession::filterOutZeroLenghtSessions) ) and nor
MySession::aggregateSessions
and MySession::mergeSessions are invoked.

Is this correct? I don't understand, maybe I've setup something wrong...

2) I can see that the stream pipeline can ingest the first 15 minutes of
data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
   - every second that passes the pipeline gets slower and slower and
   - I can see new updates to old sessions also after
.until(WINDOW_MAINTAIN_DURATION_MS)
period.
   - the stream consumer starts to ingest new data with slower and slower
rates as time passes, eventually reaching almost 0msg/sec

I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
sessions and those that have been fired, will just be removed from session
store and never touched again.


At the beginning I was thinking that my pipeline was not setup correctly,
however I've tried to follow slavishly the docs and I could not find where
things can go wrong.

Do you have some hints about this?
Please let me know if you need more info about.

thanks a lot,
Marco

Reply via email to