Re: Strange behaviour in Session Windows

2017-03-09 Thread Damian Guy
Hi Marco,

FYI this has now been merged to trunk and 0.10.2. If you wanted to try it
out i'd probably suggest checking out building the 0.10.2 branch.

Thanks,
Damian

On Tue, 7 Mar 2017 at 17:57 Damian Guy  wrote:

> Hi Marco,
>
> Absolutely no problem at all. I'm glad it work it out. For reference here
> is a PR that fixes the problem:
> https://github.com/apache/kafka/pull/2645 and the associated JIRA
> https://issues.apache.org/jira/browse/KAFKA-4851
>
> Thanks,
> Damian
>
> On Tue, 7 Mar 2017 at 17:48 Marco Abitabile 
> wrote:
>
> Hello Damian,
>
> Thanks a lot for your precious support.
>
> I confirm you that your workaround is perfectly working for my use case.
>
> I'll be glad to support you to test the original code whenever the issue
> you've spotted will be solved.
>
> Thanks a lot again.
>
> Marco.
>
> Il 06/mar/2017 16:03, "Damian Guy"  ha scritto:
>
> > Hi Marco,
> >
> > I've done some testing and found that there is a performance issue when
> > caching is enabled. I suspect his might be what you are hitting. It looks
> > to me that you can work around this by doing something like:
> >
> > final StateStoreSupplier sessionStore =
> > Stores.create(*"session-store-name"*)
> > .withKeys(Serdes.String())
> > .withValues(mySessionSerde)
> > .persistent()
> > .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> > .build();
> >
> >
> > And then in your call to aggregate, pass in the sessionStore created
> above,
> > i.e.,
> >
> > aggregate(
> > MySession::new,
> > MySession::aggregateSessions,
> > MySession::mergeSessions,
> > SessionWindows
> > .with(WINDOW_INACTIVITY_GAPS_MS),
> > mySessionSerde,
> > sessionStore)
> >
> >
> > Let us know how you get on.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile 
> > wrote:
> >
> > > Thanks Damian,
> > >
> > > sure, you are right, these details are modified to be compliant with my
> > > company rules. However the main points are unchanged.
> > >
> > > The producer of the original data is a "data ingestor" that attach few
> > > extra fields and produces a message such as:
> > >
> > > row = new JsonObject({
> > >   "id" : 12345654,
> > >   "userDeviceId" : "",
> > >   "creationTime" : 1488801350660 //produced from the remote source
> > >   "receivedTime": 1488801363455 //placed by my data ingestor,
> > >   "extra_data1" : 123, //
> > >   "extra_data2" : 456  // extra data specific for my domain all this
> data
> > > are numbers
> > >   "extra_data2" : 789  //
> > > })
> > >
> > > then it sends records into SOURCE_TOPIC (that in this context is
> > > USER_ACTIVITIES_TOPIC) as follow:
> > >
> > > long creationTimestamp = row.getLong("creationTime");
> > > long rowId = row.getLong("id");
> > > ProducerRecord producerRecord = new
> > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> > > row.toString());
> > > producer.send(producerRecord);
> > >
> > > Noteworthy:
> > > - I'm using only one partition (right now. I'm still not in production
> > and
> > > i'm discovering the feature) in production environment I would use more
> > > partitions
> > > - the message is a simple string containing json object (i'm not using
> > Avro
> > > or similar)
> > >
> > > - in my streaming application:
> > >
> > > public class MySession{
> > >
> > > private final JsonObject sessionDetails;
> > >
> > > public MySession(){
> > > this.sessionDetails = new JsonObject();
> > > }
> > >
> > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String
> > k,
> > > JsonObject j) {
> > > int userId = cache.get(j.get("userDeviceId"));
> > > return KeyValue.pair(userId, j);
> > > }
> > >
> > > public static MySession aggregate(String key, JsonObject value,
> > > MySession aggregate) {
> > > //basically MySession is a collection of all the raw data that
> > the
> > > session is composed of
> > > aggregate.addRawData(value);
> > > return aggregate;
> > > }
> > >
> > > public static MySession merge(String key, MySession arg1, MySession
> > > arg2)
> > > {
> > > arg2.merge(arg1);
> > > return arg2;
> > > }
> > >
> > > }
> > >
> > >
> > > BTW (this will be a topic for another thread anyway...) is there a way
> to
> > > be con control of MySession lifecycle? I was thinking to pool them to
> > > reduce GC workload.
> > >
> > > thanks a lot for your precious help.
> > >
> > > Marco
> > >
> > > 2017-03-06 11:59 GMT+01:00 Damian Guy :
> > >
> > > > Hi Marco,
> > > >
> > > > Your config etc look ok.
> > > >
> > > > 1. It is pretty hard to tell what is going on from just your code
> > below,
> > > > unfortunately. But the behaviour doesn't seem to be inline with what
> > I'm
> > > > reading in the streams code. For example your MySession::new function
> > > > should 

Re: Strange behaviour in Session Windows

2017-03-07 Thread Damian Guy
Hi Marco,

Absolutely no problem at all. I'm glad it work it out. For reference here
is a PR that fixes the problem:
https://github.com/apache/kafka/pull/2645 and the associated JIRA
https://issues.apache.org/jira/browse/KAFKA-4851

Thanks,
Damian

On Tue, 7 Mar 2017 at 17:48 Marco Abitabile 
wrote:

> Hello Damian,
>
> Thanks a lot for your precious support.
>
> I confirm you that your workaround is perfectly working for my use case.
>
> I'll be glad to support you to test the original code whenever the issue
> you've spotted will be solved.
>
> Thanks a lot again.
>
> Marco.
>
> Il 06/mar/2017 16:03, "Damian Guy"  ha scritto:
>
> > Hi Marco,
> >
> > I've done some testing and found that there is a performance issue when
> > caching is enabled. I suspect his might be what you are hitting. It looks
> > to me that you can work around this by doing something like:
> >
> > final StateStoreSupplier sessionStore =
> > Stores.create(*"session-store-name"*)
> > .withKeys(Serdes.String())
> > .withValues(mySessionSerde)
> > .persistent()
> > .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> > .build();
> >
> >
> > And then in your call to aggregate, pass in the sessionStore created
> above,
> > i.e.,
> >
> > aggregate(
> > MySession::new,
> > MySession::aggregateSessions,
> > MySession::mergeSessions,
> > SessionWindows
> > .with(WINDOW_INACTIVITY_GAPS_MS),
> > mySessionSerde,
> > sessionStore)
> >
> >
> > Let us know how you get on.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile 
> > wrote:
> >
> > > Thanks Damian,
> > >
> > > sure, you are right, these details are modified to be compliant with my
> > > company rules. However the main points are unchanged.
> > >
> > > The producer of the original data is a "data ingestor" that attach few
> > > extra fields and produces a message such as:
> > >
> > > row = new JsonObject({
> > >   "id" : 12345654,
> > >   "userDeviceId" : "",
> > >   "creationTime" : 1488801350660 //produced from the remote source
> > >   "receivedTime": 1488801363455 //placed by my data ingestor,
> > >   "extra_data1" : 123, //
> > >   "extra_data2" : 456  // extra data specific for my domain all this
> data
> > > are numbers
> > >   "extra_data2" : 789  //
> > > })
> > >
> > > then it sends records into SOURCE_TOPIC (that in this context is
> > > USER_ACTIVITIES_TOPIC) as follow:
> > >
> > > long creationTimestamp = row.getLong("creationTime");
> > > long rowId = row.getLong("id");
> > > ProducerRecord producerRecord = new
> > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> > > row.toString());
> > > producer.send(producerRecord);
> > >
> > > Noteworthy:
> > > - I'm using only one partition (right now. I'm still not in production
> > and
> > > i'm discovering the feature) in production environment I would use more
> > > partitions
> > > - the message is a simple string containing json object (i'm not using
> > Avro
> > > or similar)
> > >
> > > - in my streaming application:
> > >
> > > public class MySession{
> > >
> > > private final JsonObject sessionDetails;
> > >
> > > public MySession(){
> > > this.sessionDetails = new JsonObject();
> > > }
> > >
> > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String
> > k,
> > > JsonObject j) {
> > > int userId = cache.get(j.get("userDeviceId"));
> > > return KeyValue.pair(userId, j);
> > > }
> > >
> > > public static MySession aggregate(String key, JsonObject value,
> > > MySession aggregate) {
> > > //basically MySession is a collection of all the raw data that
> > the
> > > session is composed of
> > > aggregate.addRawData(value);
> > > return aggregate;
> > > }
> > >
> > > public static MySession merge(String key, MySession arg1, MySession
> > > arg2)
> > > {
> > > arg2.merge(arg1);
> > > return arg2;
> > > }
> > >
> > > }
> > >
> > >
> > > BTW (this will be a topic for another thread anyway...) is there a way
> to
> > > be con control of MySession lifecycle? I was thinking to pool them to
> > > reduce GC workload.
> > >
> > > thanks a lot for your precious help.
> > >
> > > Marco
> > >
> > > 2017-03-06 11:59 GMT+01:00 Damian Guy :
> > >
> > > > Hi Marco,
> > > >
> > > > Your config etc look ok.
> > > >
> > > > 1. It is pretty hard to tell what is going on from just your code
> > below,
> > > > unfortunately. But the behaviour doesn't seem to be inline with what
> > I'm
> > > > reading in the streams code. For example your MySession::new function
> > > > should be called once per record. The merger and aggregator should be
> > > > called pretty much immediately after that.
> > > >
> > > > 2. Data will be retained for a bit longer than the value used in
> > > > SessionWindows.until(..). The session store has 3 

Re: Strange behaviour in Session Windows

2017-03-07 Thread Marco Abitabile
Hello Damian,

Thanks a lot for your precious support.

I confirm you that your workaround is perfectly working for my use case.

I'll be glad to support you to test the original code whenever the issue
you've spotted will be solved.

Thanks a lot again.

Marco.

Il 06/mar/2017 16:03, "Damian Guy"  ha scritto:

> Hi Marco,
>
> I've done some testing and found that there is a performance issue when
> caching is enabled. I suspect his might be what you are hitting. It looks
> to me that you can work around this by doing something like:
>
> final StateStoreSupplier sessionStore =
> Stores.create(*"session-store-name"*)
> .withKeys(Serdes.String())
> .withValues(mySessionSerde)
> .persistent()
> .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> .build();
>
>
> And then in your call to aggregate, pass in the sessionStore created above,
> i.e.,
>
> aggregate(
> MySession::new,
> MySession::aggregateSessions,
> MySession::mergeSessions,
> SessionWindows
> .with(WINDOW_INACTIVITY_GAPS_MS),
> mySessionSerde,
> sessionStore)
>
>
> Let us know how you get on.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 13:27 Marco Abitabile 
> wrote:
>
> > Thanks Damian,
> >
> > sure, you are right, these details are modified to be compliant with my
> > company rules. However the main points are unchanged.
> >
> > The producer of the original data is a "data ingestor" that attach few
> > extra fields and produces a message such as:
> >
> > row = new JsonObject({
> >   "id" : 12345654,
> >   "userDeviceId" : "",
> >   "creationTime" : 1488801350660 //produced from the remote source
> >   "receivedTime": 1488801363455 //placed by my data ingestor,
> >   "extra_data1" : 123, //
> >   "extra_data2" : 456  // extra data specific for my domain all this data
> > are numbers
> >   "extra_data2" : 789  //
> > })
> >
> > then it sends records into SOURCE_TOPIC (that in this context is
> > USER_ACTIVITIES_TOPIC) as follow:
> >
> > long creationTimestamp = row.getLong("creationTime");
> > long rowId = row.getLong("id");
> > ProducerRecord producerRecord = new
> > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> > row.toString());
> > producer.send(producerRecord);
> >
> > Noteworthy:
> > - I'm using only one partition (right now. I'm still not in production
> and
> > i'm discovering the feature) in production environment I would use more
> > partitions
> > - the message is a simple string containing json object (i'm not using
> Avro
> > or similar)
> >
> > - in my streaming application:
> >
> > public class MySession{
> >
> > private final JsonObject sessionDetails;
> >
> > public MySession(){
> > this.sessionDetails = new JsonObject();
> > }
> >
> > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String
> k,
> > JsonObject j) {
> > int userId = cache.get(j.get("userDeviceId"));
> > return KeyValue.pair(userId, j);
> > }
> >
> > public static MySession aggregate(String key, JsonObject value,
> > MySession aggregate) {
> > //basically MySession is a collection of all the raw data that
> the
> > session is composed of
> > aggregate.addRawData(value);
> > return aggregate;
> > }
> >
> > public static MySession merge(String key, MySession arg1, MySession
> > arg2)
> > {
> > arg2.merge(arg1);
> > return arg2;
> > }
> >
> > }
> >
> >
> > BTW (this will be a topic for another thread anyway...) is there a way to
> > be con control of MySession lifecycle? I was thinking to pool them to
> > reduce GC workload.
> >
> > thanks a lot for your precious help.
> >
> > Marco
> >
> > 2017-03-06 11:59 GMT+01:00 Damian Guy :
> >
> > > Hi Marco,
> > >
> > > Your config etc look ok.
> > >
> > > 1. It is pretty hard to tell what is going on from just your code
> below,
> > > unfortunately. But the behaviour doesn't seem to be inline with what
> I'm
> > > reading in the streams code. For example your MySession::new function
> > > should be called once per record. The merger and aggregator should be
> > > called pretty much immediately after that.
> > >
> > > 2. Data will be retained for a bit longer than the value used in
> > > SessionWindows.until(..). The session store has 3 segments and we use
> the
> > >  retention period (i.e., value of until()) to determine the segment
> > length.
> > > The segment length is calculated as:
> > >
> > >  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
> > >
> > > Which in this case is 21 milliseconds. So maintaining 3 segments
> > means
> > > there could be data that is about 10 minutes old.
> > >
> > > Also this is completely driven by the data and specifically the time
> > > extracted from the data. I'm not sure if you can provide a sample of
> the
> > > data going through the system? It might be helpful in trying to debug
> the
> > > 

Re: Strange behaviour in Session Windows

2017-03-06 Thread Damian Guy
Hi Marco,

I've done some testing and found that there is a performance issue when
caching is enabled. I suspect his might be what you are hitting. It looks
to me that you can work around this by doing something like:

final StateStoreSupplier sessionStore =
Stores.create(*"session-store-name"*)
.withKeys(Serdes.String())
.withValues(mySessionSerde)
.persistent()
.sessionWindowed(TimeUnit.MINUTES.toMillis(7))
.build();


And then in your call to aggregate, pass in the sessionStore created above,
i.e.,

aggregate(
MySession::new,
MySession::aggregateSessions,
MySession::mergeSessions,
SessionWindows
.with(WINDOW_INACTIVITY_GAPS_MS),
mySessionSerde,
sessionStore)


Let us know how you get on.

Thanks,
Damian

On Mon, 6 Mar 2017 at 13:27 Marco Abitabile 
wrote:

> Thanks Damian,
>
> sure, you are right, these details are modified to be compliant with my
> company rules. However the main points are unchanged.
>
> The producer of the original data is a "data ingestor" that attach few
> extra fields and produces a message such as:
>
> row = new JsonObject({
>   "id" : 12345654,
>   "userDeviceId" : "",
>   "creationTime" : 1488801350660 //produced from the remote source
>   "receivedTime": 1488801363455 //placed by my data ingestor,
>   "extra_data1" : 123, //
>   "extra_data2" : 456  // extra data specific for my domain all this data
> are numbers
>   "extra_data2" : 789  //
> })
>
> then it sends records into SOURCE_TOPIC (that in this context is
> USER_ACTIVITIES_TOPIC) as follow:
>
> long creationTimestamp = row.getLong("creationTime");
> long rowId = row.getLong("id");
> ProducerRecord producerRecord = new
> ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
> row.toString());
> producer.send(producerRecord);
>
> Noteworthy:
> - I'm using only one partition (right now. I'm still not in production and
> i'm discovering the feature) in production environment I would use more
> partitions
> - the message is a simple string containing json object (i'm not using Avro
> or similar)
>
> - in my streaming application:
>
> public class MySession{
>
> private final JsonObject sessionDetails;
>
> public MySession(){
> this.sessionDetails = new JsonObject();
> }
>
> public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k,
> JsonObject j) {
> int userId = cache.get(j.get("userDeviceId"));
> return KeyValue.pair(userId, j);
> }
>
> public static MySession aggregate(String key, JsonObject value,
> MySession aggregate) {
> //basically MySession is a collection of all the raw data that the
> session is composed of
> aggregate.addRawData(value);
> return aggregate;
> }
>
> public static MySession merge(String key, MySession arg1, MySession
> arg2)
> {
> arg2.merge(arg1);
> return arg2;
> }
>
> }
>
>
> BTW (this will be a topic for another thread anyway...) is there a way to
> be con control of MySession lifecycle? I was thinking to pool them to
> reduce GC workload.
>
> thanks a lot for your precious help.
>
> Marco
>
> 2017-03-06 11:59 GMT+01:00 Damian Guy :
>
> > Hi Marco,
> >
> > Your config etc look ok.
> >
> > 1. It is pretty hard to tell what is going on from just your code below,
> > unfortunately. But the behaviour doesn't seem to be inline with what I'm
> > reading in the streams code. For example your MySession::new function
> > should be called once per record. The merger and aggregator should be
> > called pretty much immediately after that.
> >
> > 2. Data will be retained for a bit longer than the value used in
> > SessionWindows.until(..). The session store has 3 segments and we use the
> >  retention period (i.e., value of until()) to determine the segment
> length.
> > The segment length is calculated as:
> >
> >  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
> >
> > Which in this case is 21 milliseconds. So maintaining 3 segments
> means
> > there could be data that is about 10 minutes old.
> >
> > Also this is completely driven by the data and specifically the time
> > extracted from the data. I'm not sure if you can provide a sample of the
> > data going through the system? It might be helpful in trying to debug the
> > issue. (I'm not seeing anything obvious in the code).
> > Also it might help if you can get some stack traces on the streams
> > instances that appear to be stuck.
> >
> > Thanks,
> > Damian
> > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
> > wrote:
> >
> > > Hello,
> > >
> > > I'm playing around with the brand new SessionWindows. I have a simple
> > > topology such as:
> > >
> > > KStream sess =
> > >  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> > > sess
> > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> > > .groupByKey(stringSerde, 

Re: Strange behaviour in Session Windows

2017-03-06 Thread Marco Abitabile
Thanks Damian,

sure, you are right, these details are modified to be compliant with my
company rules. However the main points are unchanged.

The producer of the original data is a "data ingestor" that attach few
extra fields and produces a message such as:

row = new JsonObject({
  "id" : 12345654,
  "userDeviceId" : "",
  "creationTime" : 1488801350660 //produced from the remote source
  "receivedTime": 1488801363455 //placed by my data ingestor,
  "extra_data1" : 123, //
  "extra_data2" : 456  // extra data specific for my domain all this data
are numbers
  "extra_data2" : 789  //
})

then it sends records into SOURCE_TOPIC (that in this context is
USER_ACTIVITIES_TOPIC) as follow:

long creationTimestamp = row.getLong("creationTime");
long rowId = row.getLong("id");
ProducerRecord producerRecord = new
ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId,
row.toString());
producer.send(producerRecord);

Noteworthy:
- I'm using only one partition (right now. I'm still not in production and
i'm discovering the feature) in production environment I would use more
partitions
- the message is a simple string containing json object (i'm not using Avro
or similar)

- in my streaming application:

public class MySession{

private final JsonObject sessionDetails;

public MySession(){
this.sessionDetails = new JsonObject();
}

public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k,
JsonObject j) {
int userId = cache.get(j.get("userDeviceId"));
return KeyValue.pair(userId, j);
}

public static MySession aggregate(String key, JsonObject value,
MySession aggregate) {
//basically MySession is a collection of all the raw data that the
session is composed of
aggregate.addRawData(value);
return aggregate;
}

public static MySession merge(String key, MySession arg1, MySession arg2)
{
arg2.merge(arg1);
return arg2;
}

}


BTW (this will be a topic for another thread anyway...) is there a way to
be con control of MySession lifecycle? I was thinking to pool them to
reduce GC workload.

thanks a lot for your precious help.

Marco

2017-03-06 11:59 GMT+01:00 Damian Guy :

> Hi Marco,
>
> Your config etc look ok.
>
> 1. It is pretty hard to tell what is going on from just your code below,
> unfortunately. But the behaviour doesn't seem to be inline with what I'm
> reading in the streams code. For example your MySession::new function
> should be called once per record. The merger and aggregator should be
> called pretty much immediately after that.
>
> 2. Data will be retained for a bit longer than the value used in
> SessionWindows.until(..). The session store has 3 segments and we use the
>  retention period (i.e., value of until()) to determine the segment length.
> The segment length is calculated as:
>
>  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
>
> Which in this case is 21 milliseconds. So maintaining 3 segments means
> there could be data that is about 10 minutes old.
>
> Also this is completely driven by the data and specifically the time
> extracted from the data. I'm not sure if you can provide a sample of the
> data going through the system? It might be helpful in trying to debug the
> issue. (I'm not seeing anything obvious in the code).
> Also it might help if you can get some stack traces on the streams
> instances that appear to be stuck.
>
> Thanks,
> Damian
> On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
> wrote:
>
> > Hello,
> >
> > I'm playing around with the brand new SessionWindows. I have a simple
> > topology such as:
> >
> > KStream sess =
> >  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> > sess
> > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> > .groupByKey(stringSerde, jsonSerde)
> > .aggregate(
> > MySession::new,
> > MySession::aggregateSessions,
> > MySession::mergeSessions,
> > SessionWindows
> > .with(WINDOW_INACTIVITY_GAPS_MS)
> > .until(WINDOW_MAINTAIN_DURATION_MS),
> > .filter(MySession::filterOutZeroLenghtSessions)
> > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
> >
> > these are the most important configuration I'm using, all the other
> configs
> > are the classical serdes and hosts props:
> >
> > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> > 2_MINUTES;
> >
> > private static final Properties props = new Properties();
> >
> > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_
> CONFIG,
> > ONE_DAY);
> >
> > The source stream has data arriving at around 100 messages/second
> >
> > I'm experiencing this behaviours:
> >
> > 1) MySession::new is called thousands of times, way way more of the
> number
> > of messages ingested 

Re: Strange behaviour in Session Windows

2017-03-06 Thread Damian Guy
Hi Marco,

Can you try setting StreamsConfig.CACHE_MAX_BYTES_BUFFER_CONFIG to 0 and
see if that resolves the issue?

Thanks,
Damian


On Mon, 6 Mar 2017 at 10:59 Damian Guy  wrote:

> Hi Marco,
>
> Your config etc look ok.
>
> 1. It is pretty hard to tell what is going on from just your code below,
> unfortunately. But the behaviour doesn't seem to be inline with what I'm
> reading in the streams code. For example your MySession::new function
> should be called once per record. The merger and aggregator should be
> called pretty much immediately after that.
>
> 2. Data will be retained for a bit longer than the value used in
> SessionWindows.until(..). The session store has 3 segments and we use the
>  retention period (i.e., value of until()) to determine the segment length.
> The segment length is calculated as:
>
>  Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
>
> Which in this case is 21 milliseconds. So maintaining 3 segments means
> there could be data that is about 10 minutes old.
>
> Also this is completely driven by the data and specifically the time
> extracted from the data. I'm not sure if you can provide a sample of the
> data going through the system? It might be helpful in trying to debug the
> issue. (I'm not seeing anything obvious in the code).
> Also it might help if you can get some stack traces on the streams
> instances that appear to be stuck.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
> wrote:
>
> Hello,
>
> I'm playing around with the brand new SessionWindows. I have a simple
> topology such as:
>
> KStream sess =
>  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> sess
> .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> .groupByKey(stringSerde, jsonSerde)
> .aggregate(
> MySession::new,
> MySession::aggregateSessions,
> MySession::mergeSessions,
> SessionWindows
> .with(WINDOW_INACTIVITY_GAPS_MS)
> .until(WINDOW_MAINTAIN_DURATION_MS),
> .filter(MySession::filterOutZeroLenghtSessions)
> .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
> these are the most important configuration I'm using, all the other configs
> are the classical serdes and hosts props:
>
> private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> 2_MINUTES;
>
> private static final Properties props = new Properties();
>
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> ONE_DAY);
>
> The source stream has data arriving at around 100 messages/second
>
> I'm experiencing this behaviours:
>
> 1) MySession::new is called thousands of times, way way more of the number
> of messages ingested (around 100 / 1000 times more) the most of this
> sessions never reach the end of the pipeline (even if I remove
> .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> MySession::aggregateSessions
> and MySession::mergeSessions are invoked.
>
> Is this correct? I don't understand, maybe I've setup something wrong...
>
> 2) I can see that the stream pipeline can ingest the first 15 minutes of
> data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
>- every second that passes the pipeline gets slower and slower and
>- I can see new updates to old sessions also after
> .until(WINDOW_MAINTAIN_DURATION_MS)
> period.
>- the stream consumer starts to ingest new data with slower and slower
> rates as time passes, eventually reaching almost 0msg/sec
>
> I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> sessions and those that have been fired, will just be removed from session
> store and never touched again.
>
>
> At the beginning I was thinking that my pipeline was not setup correctly,
> however I've tried to follow slavishly the docs and I could not find where
> things can go wrong.
>
> Do you have some hints about this?
> Please let me know if you need more info about.
>
> thanks a lot,
> Marco
>
>


Re: Strange behaviour in Session Windows

2017-03-06 Thread Damian Guy
Hi Marco,

Your config etc look ok.

1. It is pretty hard to tell what is going on from just your code below,
unfortunately. But the behaviour doesn't seem to be inline with what I'm
reading in the streams code. For example your MySession::new function
should be called once per record. The merger and aggregator should be
called pretty much immediately after that.

2. Data will be retained for a bit longer than the value used in
SessionWindows.until(..). The session store has 3 segments and we use the
 retention period (i.e., value of until()) to determine the segment length.
The segment length is calculated as:

 Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);

Which in this case is 21 milliseconds. So maintaining 3 segments means
there could be data that is about 10 minutes old.

Also this is completely driven by the data and specifically the time
extracted from the data. I'm not sure if you can provide a sample of the
data going through the system? It might be helpful in trying to debug the
issue. (I'm not seeing anything obvious in the code).
Also it might help if you can get some stack traces on the streams
instances that appear to be stuck.

Thanks,
Damian
On Mon, 6 Mar 2017 at 09:59 Marco Abitabile 
wrote:

> Hello,
>
> I'm playing around with the brand new SessionWindows. I have a simple
> topology such as:
>
> KStream sess =
>  builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
> sess
> .map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
> .groupByKey(stringSerde, jsonSerde)
> .aggregate(
> MySession::new,
> MySession::aggregateSessions,
> MySession::mergeSessions,
> SessionWindows
> .with(WINDOW_INACTIVITY_GAPS_MS)
> .until(WINDOW_MAINTAIN_DURATION_MS),
> .filter(MySession::filterOutZeroLenghtSessions)
> .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);
>
> these are the most important configuration I'm using, all the other configs
> are the classical serdes and hosts props:
>
> private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
> private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
> 2_MINUTES;
>
> private static final Properties props = new Properties();
>
> props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> ONE_DAY);
>
> The source stream has data arriving at around 100 messages/second
>
> I'm experiencing this behaviours:
>
> 1) MySession::new is called thousands of times, way way more of the number
> of messages ingested (around 100 / 1000 times more) the most of this
> sessions never reach the end of the pipeline (even if I remove
> .filter(MySession::filterOutZeroLenghtSessions) ) and nor
> MySession::aggregateSessions
> and MySession::mergeSessions are invoked.
>
> Is this correct? I don't understand, maybe I've setup something wrong...
>
> 2) I can see that the stream pipeline can ingest the first 15 minutes of
> data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
>- every second that passes the pipeline gets slower and slower and
>- I can see new updates to old sessions also after
> .until(WINDOW_MAINTAIN_DURATION_MS)
> period.
>- the stream consumer starts to ingest new data with slower and slower
> rates as time passes, eventually reaching almost 0msg/sec
>
> I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
> sessions and those that have been fired, will just be removed from session
> store and never touched again.
>
>
> At the beginning I was thinking that my pipeline was not setup correctly,
> however I've tried to follow slavishly the docs and I could not find where
> things can go wrong.
>
> Do you have some hints about this?
> Please let me know if you need more info about.
>
> thanks a lot,
> Marco
>


Strange behaviour in Session Windows

2017-03-06 Thread Marco Abitabile
Hello,

I'm playing around with the brand new SessionWindows. I have a simple
topology such as:

KStream sess =
 builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC);
sess
.map(MySession::enhanceWithUserId_And_PutUserIdAsKey)
.groupByKey(stringSerde, jsonSerde)
.aggregate(
MySession::new,
MySession::aggregateSessions,
MySession::mergeSessions,
SessionWindows
.with(WINDOW_INACTIVITY_GAPS_MS)
.until(WINDOW_MAINTAIN_DURATION_MS),
.filter(MySession::filterOutZeroLenghtSessions)
.to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE);

these are the most important configuration I'm using, all the other configs
are the classical serdes and hosts props:

private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES
private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES +
2_MINUTES;

private static final Properties props = new Properties();
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
ONE_DAY);

The source stream has data arriving at around 100 messages/second

I'm experiencing this behaviours:

1) MySession::new is called thousands of times, way way more of the number
of messages ingested (around 100 / 1000 times more) the most of this
sessions never reach the end of the pipeline (even if I remove
.filter(MySession::filterOutZeroLenghtSessions) ) and nor
MySession::aggregateSessions
and MySession::mergeSessions are invoked.

Is this correct? I don't understand, maybe I've setup something wrong...

2) I can see that the stream pipeline can ingest the first 15 minutes of
data and sessions that reach SINK_TOPIC_KTABLE  looks good. However:
   - every second that passes the pipeline gets slower and slower and
   - I can see new updates to old sessions also after
.until(WINDOW_MAINTAIN_DURATION_MS)
period.
   - the stream consumer starts to ingest new data with slower and slower
rates as time passes, eventually reaching almost 0msg/sec

I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new
sessions and those that have been fired, will just be removed from session
store and never touched again.


At the beginning I was thinking that my pipeline was not setup correctly,
however I've tried to follow slavishly the docs and I could not find where
things can go wrong.

Do you have some hints about this?
Please let me know if you need more info about.

thanks a lot,
Marco