Hi Marco,

I've done some testing and found that there is a performance issue when
caching is enabled. I suspect his might be what you are hitting. It looks
to me that you can work around this by doing something like:

final StateStoreSupplier<SessionStore> sessionStore =
Stores.create(*"session-store-name"*)
    .withKeys(Serdes.String())
    .withValues(mySessionSerde)
    .persistent()
    .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
    .build();


And then in your call to aggregate, pass in the sessionStore created above,
i.e.,

aggregate(
    MySession::new,
    MySession::aggregateSessions,
    MySession::mergeSessions,
    SessionWindows
        .with(WINDOW_INACTIVITY_GAPS_MS),
    mySessionSerde,
    sessionStore)


Let us know how you get on.

Thanks,
Damian

On Mon, 6 Mar 2017 at 13:27 Marco Abitabile <marco.abitab...@gmail.com>
wrote:

> Thanks Damian,
>
> sure, you are right, these details are modified to be compliant with my
> company rules. However the main points are unchanged.
>
> The producer of the original data is a "data ingestor" that attach few
> extra fields and produces a message such as:
>
> row = new JsonObject({
>   "id" : 12345654,
>   "userDeviceId" : "",
>   "creationTime" : 1488801350660 //produced from the remote source
>   "receivedTime": 1488801363455 //placed by my data ingestor,
>   "extra_data1" : 123, //
>   "extra_data2" : 456  // extra data specific for my domain all this data
> are numbers
>   "extra_data2" : 789  //
> })
>
> then it sends records into SOURCE_TOPIC (that in this context is
> USER_ACTIVITIES_TOPIC) as follow:
>
> long creationTimestamp = row.getLong("creationTime");
> long rowId = row.getLong("id");
> ProducerRecord<String, String> producerRecord = new
> ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> row.toString());
> producer.send(producerRecord);
>
> Noteworthy:
> - I'm using only one partition (right now. I'm still not in production and
> i'm discovering the feature) in production environment I would use more
> partitions
> - the message is a simple string containing json object (i'm not using Avro
> or similar)
>
> - in my streaming application:
>
> public class MySession{
>
>     private final JsonObject sessionDetails;
>
>     public MySession(){
>         this.sessionDetails = new JsonObject();
>     }
>
>     public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k,
> JsonObject j) {
>         int userId = cache.get(j.get("userDeviceId"));
>         return KeyValue.pair(userId, j);
>     }
>
>     public static MySession aggregate(String key, JsonObject value,
> MySession aggregate) {
>         //basically MySession is a collection of all the raw data that the
> session is composed of
>         aggregate.addRawData(value);
>         return aggregate;
>     }
>
>     public static MySession merge(String key, MySession arg1, MySession
> arg2)
> {
>         arg2.merge(arg1);
>         return arg2;
>     }
>
> }
>
>
> BTW (this will be a topic for another thread anyway...) is there a way to
> be con control of MySession lifecycle? I was thinking to pool them to
> reduce GC workload.
>
> thanks a lot for your precious help.
>
> Marco
>
> 2017-03-06 11:59 GMT+01:00 Damian Guy <damian....@gmail.com>:
>
> > Hi Marco,
> >
> > Your config etc look ok.
> >
> > 1. It is pretty hard to tell what is going on from just your code below,
> > unfortunately. But the behaviour doesn't seem to be inline with what I'm
> > reading in the streams code. For example your MySession::new function
> > should be called once per record. The merger and aggregator should be
> > called pretty much immediately after that.
> >
> > 2. Data will be retained for a bit longer than the value used in
> > SessionWindows.until(..). The session store has 3 segments and we use the
> >  retention period (i.e., value of until()) to determine the segment
> length.
> > The segment length is calculated as:
> >
> >  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
> >
> > Which in this case is 210000 milliseconds. So maintaining 3 segments
> means
> > there could be data that is about 10 minutes old.
> >
> > Also this is completely driven by the data and specifically the time
> > extracted from the data. I'm not sure if you can provide a sample of the
> > data going through the system? It might be helpful in trying to debug the
> > issue. (I'm not seeing anything obvious in the code).
> > Also it might help if you can get some stack traces on the streams
> > instances that appear to be stuck.
> >
> > Thanks,
> > Damian
> > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile <marco.abitab...@gmail.com>
> > wrote:
> >
> > > Hello,
> > >
> > > I'm playing around with the brand new SessionWindows. I have a simple
> > > topology such as:
> > >
> > > KStream<String, JsonObject> sess =
> > >  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> > > sess
> > >     .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> > >     .groupByKey(stringSerde, jsonSerde)
> > >     .aggregate(
> > >         MySession::new,
> > >         MySession::aggregateSessions,
> > >         MySession::mergeSessions,
> > >         SessionWindows
> > >             .with(WINDOW_INACTIVITY_GAPS_MS)
> > >             .until(WINDOW_MAINTAIN_DURATION_MS),
> > >     .filter(MySession::filterOutZeroLenghtSessions)
> > >     .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
> > >
> > > these are the most important configuration I'm using, all the other
> > configs
> > > are the classical serdes and hosts props:
> > >
> > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> > > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> > > 2_MINUTES;
> > >
> > > private static final Properties props = new Properties();
> > >
> > >
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_
> > CONFIG,
> > > ONE_DAY);
> > >
> > > The source stream has data arriving at around 100 messages/second
> > >
> > > I'm experiencing this behaviours:
> > >
> > > 1) MySession::new is called thousands of times, way way more of the
> > number
> > > of messages ingested (around 100 / 1000 times more) the most of this
> > > sessions never reach the end of the pipeline (even if I remove
> > > .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> > > MySession::aggregateSessions
> > > and MySession::mergeSessions are invoked.
> > >
> > > Is this correct? I don't understand, maybe I've setup something
> wrong...
> > >
> > > 2) I can see that the stream pipeline can ingest the first 15 minutes
> of
> > > data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
> > >    - every second that passes the pipeline gets slower and slower and
> > >    - I can see new updates to old sessions also after
> > > .until(WINDOW_MAINTAIN_DURATION_MS)
> > > period.
> > >    - the stream consumer starts to ingest new data with slower and
> slower
> > > rates as time passes, eventually reaching almost 0msg/sec
> > >
> > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only
> new
> > > sessions and those that have been fired, will just be removed from
> > session
> > > store and never touched again.
> > >
> > >
> > > At the beginning I was thinking that my pipeline was not setup
> correctly,
> > > however I've tried to follow slavishly the docs and I could not find
> > where
> > > things can go wrong.
> > >
> > > Do you have some hints about this?
> > > Please let me know if you need more info about.
> > >
> > > thanks a lot,
> > > Marco
> > >
> >
>

Reply via email to