Hi Marco,

Can you try setting StreamsConfig.CACHE_MAX_BYTES_BUFFER_CONFIG to 0 and
see if that resolves the issue?

Thanks,
Damian


On Mon, 6 Mar 2017 at 10:59 Damian Guy <damian....@gmail.com> wrote:

> Hi Marco,
>
> Your config etc look ok.
>
> 1. It is pretty hard to tell what is going on from just your code below,
> unfortunately. But the behaviour doesn't seem to be inline with what I'm
> reading in the streams code. For example your MySession::new function
> should be called once per record. The merger and aggregator should be
> called pretty much immediately after that.
>
> 2. Data will be retained for a bit longer than the value used in
> SessionWindows.until(..). The session store has 3 segments and we use the
>  retention period (i.e., value of until()) to determine the segment length.
> The segment length is calculated as:
>
>  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
>
> Which in this case is 210000 milliseconds. So maintaining 3 segments means
> there could be data that is about 10 minutes old.
>
> Also this is completely driven by the data and specifically the time
> extracted from the data. I'm not sure if you can provide a sample of the
> data going through the system? It might be helpful in trying to debug the
> issue. (I'm not seeing anything obvious in the code).
> Also it might help if you can get some stack traces on the streams
> instances that appear to be stuck.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com>
> wrote:
>
> Hello,
>
> I'm playing around with the brand new SessionWindows. I have a simple
> topology such as:
>
> KStream<String, JsonObject> sess =
>  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> sess
>     .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
>     .groupByKey(stringSerde, jsonSerde)
>     .aggregate(
>         MySession::new,
>         MySession::aggregateSessions,
>         MySession::mergeSessions,
>         SessionWindows
>             .with(WINDOW_INACTIVITY_GAPS_MS)
>             .until(WINDOW_MAINTAIN_DURATION_MS),
>     .filter(MySession::filterOutZeroLenghtSessions)
>     .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
> these are the most important configuration I'm using, all the other configs
> are the classical serdes and hosts props:
>
> private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> 2_MINUTES;
>
> private static final Properties props = new Properties();
>
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> ONE_DAY);
>
> The source stream has data arriving at around 100 messages/second
>
> I'm experiencing this behaviours:
>
> 1) MySession::new is called thousands of times, way way more of the number
> of messages ingested (around 100 / 1000 times more) the most of this
> sessions never reach the end of the pipeline (even if I remove
> .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> MySession::aggregateSessions
> and MySession::mergeSessions are invoked.
>
> Is this correct? I don't understand, maybe I've setup something wrong...
>
> 2) I can see that the stream pipeline can ingest the first 15 minutes of
> data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
>    - every second that passes the pipeline gets slower and slower and
>    - I can see new updates to old sessions also after
> .until(WINDOW_MAINTAIN_DURATION_MS)
> period.
>    - the stream consumer starts to ingest new data with slower and slower
> rates as time passes, eventually reaching almost 0msg/sec
>
> I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> sessions and those that have been fired, will just be removed from session
> store and never touched again.
>
>
> At the beginning I was thinking that my pipeline was not setup correctly,
> however I've tried to follow slavishly the docs and I could not find where
> things can go wrong.
>
> Do you have some hints about this?
> Please let me know if you need more info about.
>
> thanks a lot,
> Marco
>
>

Reply via email to