Thanks Damian,

sure, you are right, these details are modified to be compliant with my
company rules. However the main points are unchanged.

The producer of the original data is a "data ingestor" that attach few
extra fields and produces a message such as:

row = new JsonObject({
  "id" : 12345654,
  "userDeviceId" : "",
  "creationTime" : 1488801350660 //produced from the remote source
  "receivedTime": 1488801363455 //placed by my data ingestor,
  "extra_data1" : 123, //
  "extra_data2" : 456  // extra data specific for my domain all this data
are numbers
  "extra_data2" : 789  //

then it sends records into SOURCE_TOPIC (that in this context is

long creationTimestamp = row.getLong("creationTime");
long rowId = row.getLong("id");
ProducerRecord<String, String> producerRecord = new
ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,

- I'm using only one partition (right now. I'm still not in production and
i'm discovering the feature) in production environment I would use more
- the message is a simple string containing json object (i'm not using Avro
or similar)

- in my streaming application:

public class MySession{

    private final JsonObject sessionDetails;

    public MySession(){
        this.sessionDetails = new JsonObject();

    public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k,
JsonObject j) {
        int userId = cache.get(j.get("userDeviceId"));
        return KeyValue.pair(userId, j);

    public static MySession aggregate(String key, JsonObject value,
MySession aggregate) {
        //basically MySession is a collection of all the raw data that the
session is composed of
        return aggregate;

    public static MySession merge(String key, MySession arg1, MySession arg2)
        return arg2;


BTW (this will be a topic for another thread anyway...) is there a way to
be con control of MySession lifecycle? I was thinking to pool them to
reduce GC workload.

thanks a lot for your precious help.


2017-03-06 11:59 GMT+01:00 Damian Guy <>:

> Hi Marco,
> Your config etc look ok.
> 1. It is pretty hard to tell what is going on from just your code below,
> unfortunately. But the behaviour doesn't seem to be inline with what I'm
> reading in the streams code. For example your MySession::new function
> should be called once per record. The merger and aggregator should be
> called pretty much immediately after that.
> 2. Data will be retained for a bit longer than the value used in
> SessionWindows.until(..). The session store has 3 segments and we use the
>  retention period (i.e., value of until()) to determine the segment length.
> The segment length is calculated as:
>  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
> Which in this case is 210000 milliseconds. So maintaining 3 segments means
> there could be data that is about 10 minutes old.
> Also this is completely driven by the data and specifically the time
> extracted from the data. I'm not sure if you can provide a sample of the
> data going through the system? It might be helpful in trying to debug the
> issue. (I'm not seeing anything obvious in the code).
> Also it might help if you can get some stack traces on the streams
> instances that appear to be stuck.
> Thanks,
> Damian
> On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <>
> wrote:
> > Hello,
> >
> > I'm playing around with the brand new SessionWindows. I have a simple
> > topology such as:
> >
> > KStream<String, JsonObject> sess =
> >, jsonSerde, SOURCE_TOPIC);
> > sess
> >     .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> >     .groupByKey(stringSerde, jsonSerde)
> >     .aggregate(
> >         MySession::new,
> >         MySession::aggregateSessions,
> >         MySession::mergeSessions,
> >         SessionWindows
> >             .with(WINDOW_INACTIVITY_GAPS_MS)
> >             .until(WINDOW_MAINTAIN_DURATION_MS),
> >     .filter(MySession::filterOutZeroLenghtSessions)
> >     .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
> >
> > these are the most important configuration I'm using, all the other
> configs
> > are the classical serdes and hosts props:
> >
> > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> > 2_MINUTES;
> >
> > private static final Properties props = new Properties();
> >
> > ONE_DAY);
> >
> > The source stream has data arriving at around 100 messages/second
> >
> > I'm experiencing this behaviours:
> >
> > 1) MySession::new is called thousands of times, way way more of the
> number
> > of messages ingested (around 100 / 1000 times more) the most of this
> > sessions never reach the end of the pipeline (even if I remove
> > .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> > MySession::aggregateSessions
> > and MySession::mergeSessions are invoked.
> >
> > Is this correct? I don't understand, maybe I've setup something wrong...
> >
> > 2) I can see that the stream pipeline can ingest the first 15 minutes of
> > data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
> >    - every second that passes the pipeline gets slower and slower and
> >    - I can see new updates to old sessions also after
> > period.
> >    - the stream consumer starts to ingest new data with slower and slower
> > rates as time passes, eventually reaching almost 0msg/sec
> >
> > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> > sessions and those that have been fired, will just be removed from
> session
> > store and never touched again.
> >
> >
> > At the beginning I was thinking that my pipeline was not setup correctly,
> > however I've tried to follow slavishly the docs and I could not find
> where
> > things can go wrong.
> >
> > Do you have some hints about this?
> > Please let me know if you need more info about.
> >
> > thanks a lot,
> > Marco
> >

Reply via email to