Re: Strange behaviour in Session Windows
Hi Marco, FYI this has now been merged to trunk and 0.10.2. If you wanted to try it out i'd probably suggest checking out building the 0.10.2 branch. Thanks, Damian On Tue, 7 Mar 2017 at 17:57 Damian Guywrote: > Hi Marco, > > Absolutely no problem at all. I'm glad it work it out. For reference here > is a PR that fixes the problem: > https://github.com/apache/kafka/pull/2645 and the associated JIRA > https://issues.apache.org/jira/browse/KAFKA-4851 > > Thanks, > Damian > > On Tue, 7 Mar 2017 at 17:48 Marco Abitabile > wrote: > > Hello Damian, > > Thanks a lot for your precious support. > > I confirm you that your workaround is perfectly working for my use case. > > I'll be glad to support you to test the original code whenever the issue > you've spotted will be solved. > > Thanks a lot again. > > Marco. > > Il 06/mar/2017 16:03, "Damian Guy" ha scritto: > > > Hi Marco, > > > > I've done some testing and found that there is a performance issue when > > caching is enabled. I suspect his might be what you are hitting. It looks > > to me that you can work around this by doing something like: > > > > final StateStoreSupplier sessionStore = > > Stores.create(*"session-store-name"*) > > .withKeys(Serdes.String()) > > .withValues(mySessionSerde) > > .persistent() > > .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > > .build(); > > > > > > And then in your call to aggregate, pass in the sessionStore created > above, > > i.e., > > > > aggregate( > > MySession::new, > > MySession::aggregateSessions, > > MySession::mergeSessions, > > SessionWindows > > .with(WINDOW_INACTIVITY_GAPS_MS), > > mySessionSerde, > > sessionStore) > > > > > > Let us know how you get on. > > > > Thanks, > > Damian > > > > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile > > wrote: > > > > > Thanks Damian, > > > > > > sure, you are right, these details are modified to be compliant with my > > > company rules. However the main points are unchanged. > > > > > > The producer of the original data is a "data ingestor" that attach few > > > extra fields and produces a message such as: > > > > > > row = new JsonObject({ > > > "id" : 12345654, > > > "userDeviceId" : "", > > > "creationTime" : 1488801350660 //produced from the remote source > > > "receivedTime": 1488801363455 //placed by my data ingestor, > > > "extra_data1" : 123, // > > > "extra_data2" : 456 // extra data specific for my domain all this > data > > > are numbers > > > "extra_data2" : 789 // > > > }) > > > > > > then it sends records into SOURCE_TOPIC (that in this context is > > > USER_ACTIVITIES_TOPIC) as follow: > > > > > > long creationTimestamp = row.getLong("creationTime"); > > > long rowId = row.getLong("id"); > > > ProducerRecord producerRecord = new > > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > > > row.toString()); > > > producer.send(producerRecord); > > > > > > Noteworthy: > > > - I'm using only one partition (right now. I'm still not in production > > and > > > i'm discovering the feature) in production environment I would use more > > > partitions > > > - the message is a simple string containing json object (i'm not using > > Avro > > > or similar) > > > > > > - in my streaming application: > > > > > > public class MySession{ > > > > > > private final JsonObject sessionDetails; > > > > > > public MySession(){ > > > this.sessionDetails = new JsonObject(); > > > } > > > > > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String > > k, > > > JsonObject j) { > > > int userId = cache.get(j.get("userDeviceId")); > > > return KeyValue.pair(userId, j); > > > } > > > > > > public static MySession aggregate(String key, JsonObject value, > > > MySession aggregate) { > > > //basically MySession is a collection of all the raw data that > > the > > > session is composed of > > > aggregate.addRawData(value); > > > return aggregate; > > > } > > > > > > public static MySession merge(String key, MySession arg1, MySession > > > arg2) > > > { > > > arg2.merge(arg1); > > > return arg2; > > > } > > > > > > } > > > > > > > > > BTW (this will be a topic for another thread anyway...) is there a way > to > > > be con control of MySession lifecycle? I was thinking to pool them to > > > reduce GC workload. > > > > > > thanks a lot for your precious help. > > > > > > Marco > > > > > > 2017-03-06 11:59 GMT+01:00 Damian Guy : > > > > > > > Hi Marco, > > > > > > > > Your config etc look ok. > > > > > > > > 1. It is pretty hard to tell what is going on from just your code > > below, > > > > unfortunately. But the behaviour doesn't seem to be inline with what > > I'm > > > > reading in the streams code. For example your MySession::new function > > > > should
Re: Strange behaviour in Session Windows
Hi Marco, Absolutely no problem at all. I'm glad it work it out. For reference here is a PR that fixes the problem: https://github.com/apache/kafka/pull/2645 and the associated JIRA https://issues.apache.org/jira/browse/KAFKA-4851 Thanks, Damian On Tue, 7 Mar 2017 at 17:48 Marco Abitabilewrote: > Hello Damian, > > Thanks a lot for your precious support. > > I confirm you that your workaround is perfectly working for my use case. > > I'll be glad to support you to test the original code whenever the issue > you've spotted will be solved. > > Thanks a lot again. > > Marco. > > Il 06/mar/2017 16:03, "Damian Guy" ha scritto: > > > Hi Marco, > > > > I've done some testing and found that there is a performance issue when > > caching is enabled. I suspect his might be what you are hitting. It looks > > to me that you can work around this by doing something like: > > > > final StateStoreSupplier sessionStore = > > Stores.create(*"session-store-name"*) > > .withKeys(Serdes.String()) > > .withValues(mySessionSerde) > > .persistent() > > .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > > .build(); > > > > > > And then in your call to aggregate, pass in the sessionStore created > above, > > i.e., > > > > aggregate( > > MySession::new, > > MySession::aggregateSessions, > > MySession::mergeSessions, > > SessionWindows > > .with(WINDOW_INACTIVITY_GAPS_MS), > > mySessionSerde, > > sessionStore) > > > > > > Let us know how you get on. > > > > Thanks, > > Damian > > > > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile > > wrote: > > > > > Thanks Damian, > > > > > > sure, you are right, these details are modified to be compliant with my > > > company rules. However the main points are unchanged. > > > > > > The producer of the original data is a "data ingestor" that attach few > > > extra fields and produces a message such as: > > > > > > row = new JsonObject({ > > > "id" : 12345654, > > > "userDeviceId" : "", > > > "creationTime" : 1488801350660 //produced from the remote source > > > "receivedTime": 1488801363455 //placed by my data ingestor, > > > "extra_data1" : 123, // > > > "extra_data2" : 456 // extra data specific for my domain all this > data > > > are numbers > > > "extra_data2" : 789 // > > > }) > > > > > > then it sends records into SOURCE_TOPIC (that in this context is > > > USER_ACTIVITIES_TOPIC) as follow: > > > > > > long creationTimestamp = row.getLong("creationTime"); > > > long rowId = row.getLong("id"); > > > ProducerRecord producerRecord = new > > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > > > row.toString()); > > > producer.send(producerRecord); > > > > > > Noteworthy: > > > - I'm using only one partition (right now. I'm still not in production > > and > > > i'm discovering the feature) in production environment I would use more > > > partitions > > > - the message is a simple string containing json object (i'm not using > > Avro > > > or similar) > > > > > > - in my streaming application: > > > > > > public class MySession{ > > > > > > private final JsonObject sessionDetails; > > > > > > public MySession(){ > > > this.sessionDetails = new JsonObject(); > > > } > > > > > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String > > k, > > > JsonObject j) { > > > int userId = cache.get(j.get("userDeviceId")); > > > return KeyValue.pair(userId, j); > > > } > > > > > > public static MySession aggregate(String key, JsonObject value, > > > MySession aggregate) { > > > //basically MySession is a collection of all the raw data that > > the > > > session is composed of > > > aggregate.addRawData(value); > > > return aggregate; > > > } > > > > > > public static MySession merge(String key, MySession arg1, MySession > > > arg2) > > > { > > > arg2.merge(arg1); > > > return arg2; > > > } > > > > > > } > > > > > > > > > BTW (this will be a topic for another thread anyway...) is there a way > to > > > be con control of MySession lifecycle? I was thinking to pool them to > > > reduce GC workload. > > > > > > thanks a lot for your precious help. > > > > > > Marco > > > > > > 2017-03-06 11:59 GMT+01:00 Damian Guy : > > > > > > > Hi Marco, > > > > > > > > Your config etc look ok. > > > > > > > > 1. It is pretty hard to tell what is going on from just your code > > below, > > > > unfortunately. But the behaviour doesn't seem to be inline with what > > I'm > > > > reading in the streams code. For example your MySession::new function > > > > should be called once per record. The merger and aggregator should be > > > > called pretty much immediately after that. > > > > > > > > 2. Data will be retained for a bit longer than the value used in > > > > SessionWindows.until(..). The session store has 3
Re: Strange behaviour in Session Windows
Hello Damian, Thanks a lot for your precious support. I confirm you that your workaround is perfectly working for my use case. I'll be glad to support you to test the original code whenever the issue you've spotted will be solved. Thanks a lot again. Marco. Il 06/mar/2017 16:03, "Damian Guy"ha scritto: > Hi Marco, > > I've done some testing and found that there is a performance issue when > caching is enabled. I suspect his might be what you are hitting. It looks > to me that you can work around this by doing something like: > > final StateStoreSupplier sessionStore = > Stores.create(*"session-store-name"*) > .withKeys(Serdes.String()) > .withValues(mySessionSerde) > .persistent() > .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) > .build(); > > > And then in your call to aggregate, pass in the sessionStore created above, > i.e., > > aggregate( > MySession::new, > MySession::aggregateSessions, > MySession::mergeSessions, > SessionWindows > .with(WINDOW_INACTIVITY_GAPS_MS), > mySessionSerde, > sessionStore) > > > Let us know how you get on. > > Thanks, > Damian > > On Mon, 6 Mar 2017 at 13:27 Marco Abitabile > wrote: > > > Thanks Damian, > > > > sure, you are right, these details are modified to be compliant with my > > company rules. However the main points are unchanged. > > > > The producer of the original data is a "data ingestor" that attach few > > extra fields and produces a message such as: > > > > row = new JsonObject({ > > "id" : 12345654, > > "userDeviceId" : "", > > "creationTime" : 1488801350660 //produced from the remote source > > "receivedTime": 1488801363455 //placed by my data ingestor, > > "extra_data1" : 123, // > > "extra_data2" : 456 // extra data specific for my domain all this data > > are numbers > > "extra_data2" : 789 // > > }) > > > > then it sends records into SOURCE_TOPIC (that in this context is > > USER_ACTIVITIES_TOPIC) as follow: > > > > long creationTimestamp = row.getLong("creationTime"); > > long rowId = row.getLong("id"); > > ProducerRecord producerRecord = new > > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > > row.toString()); > > producer.send(producerRecord); > > > > Noteworthy: > > - I'm using only one partition (right now. I'm still not in production > and > > i'm discovering the feature) in production environment I would use more > > partitions > > - the message is a simple string containing json object (i'm not using > Avro > > or similar) > > > > - in my streaming application: > > > > public class MySession{ > > > > private final JsonObject sessionDetails; > > > > public MySession(){ > > this.sessionDetails = new JsonObject(); > > } > > > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String > k, > > JsonObject j) { > > int userId = cache.get(j.get("userDeviceId")); > > return KeyValue.pair(userId, j); > > } > > > > public static MySession aggregate(String key, JsonObject value, > > MySession aggregate) { > > //basically MySession is a collection of all the raw data that > the > > session is composed of > > aggregate.addRawData(value); > > return aggregate; > > } > > > > public static MySession merge(String key, MySession arg1, MySession > > arg2) > > { > > arg2.merge(arg1); > > return arg2; > > } > > > > } > > > > > > BTW (this will be a topic for another thread anyway...) is there a way to > > be con control of MySession lifecycle? I was thinking to pool them to > > reduce GC workload. > > > > thanks a lot for your precious help. > > > > Marco > > > > 2017-03-06 11:59 GMT+01:00 Damian Guy : > > > > > Hi Marco, > > > > > > Your config etc look ok. > > > > > > 1. It is pretty hard to tell what is going on from just your code > below, > > > unfortunately. But the behaviour doesn't seem to be inline with what > I'm > > > reading in the streams code. For example your MySession::new function > > > should be called once per record. The merger and aggregator should be > > > called pretty much immediately after that. > > > > > > 2. Data will be retained for a bit longer than the value used in > > > SessionWindows.until(..). The session store has 3 segments and we use > the > > > retention period (i.e., value of until()) to determine the segment > > length. > > > The segment length is calculated as: > > > > > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > > > > > Which in this case is 21 milliseconds. So maintaining 3 segments > > means > > > there could be data that is about 10 minutes old. > > > > > > Also this is completely driven by the data and specifically the time > > > extracted from the data. I'm not sure if you can provide a sample of > the > > > data going through the system? It might be helpful in trying to debug > the > > >
Re: Strange behaviour in Session Windows
Hi Marco, I've done some testing and found that there is a performance issue when caching is enabled. I suspect his might be what you are hitting. It looks to me that you can work around this by doing something like: final StateStoreSupplier sessionStore = Stores.create(*"session-store-name"*) .withKeys(Serdes.String()) .withValues(mySessionSerde) .persistent() .sessionWindowed(TimeUnit.MINUTES.toMillis(7)) .build(); And then in your call to aggregate, pass in the sessionStore created above, i.e., aggregate( MySession::new, MySession::aggregateSessions, MySession::mergeSessions, SessionWindows .with(WINDOW_INACTIVITY_GAPS_MS), mySessionSerde, sessionStore) Let us know how you get on. Thanks, Damian On Mon, 6 Mar 2017 at 13:27 Marco Abitabilewrote: > Thanks Damian, > > sure, you are right, these details are modified to be compliant with my > company rules. However the main points are unchanged. > > The producer of the original data is a "data ingestor" that attach few > extra fields and produces a message such as: > > row = new JsonObject({ > "id" : 12345654, > "userDeviceId" : "", > "creationTime" : 1488801350660 //produced from the remote source > "receivedTime": 1488801363455 //placed by my data ingestor, > "extra_data1" : 123, // > "extra_data2" : 456 // extra data specific for my domain all this data > are numbers > "extra_data2" : 789 // > }) > > then it sends records into SOURCE_TOPIC (that in this context is > USER_ACTIVITIES_TOPIC) as follow: > > long creationTimestamp = row.getLong("creationTime"); > long rowId = row.getLong("id"); > ProducerRecord producerRecord = new > ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, > row.toString()); > producer.send(producerRecord); > > Noteworthy: > - I'm using only one partition (right now. I'm still not in production and > i'm discovering the feature) in production environment I would use more > partitions > - the message is a simple string containing json object (i'm not using Avro > or similar) > > - in my streaming application: > > public class MySession{ > > private final JsonObject sessionDetails; > > public MySession(){ > this.sessionDetails = new JsonObject(); > } > > public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k, > JsonObject j) { > int userId = cache.get(j.get("userDeviceId")); > return KeyValue.pair(userId, j); > } > > public static MySession aggregate(String key, JsonObject value, > MySession aggregate) { > //basically MySession is a collection of all the raw data that the > session is composed of > aggregate.addRawData(value); > return aggregate; > } > > public static MySession merge(String key, MySession arg1, MySession > arg2) > { > arg2.merge(arg1); > return arg2; > } > > } > > > BTW (this will be a topic for another thread anyway...) is there a way to > be con control of MySession lifecycle? I was thinking to pool them to > reduce GC workload. > > thanks a lot for your precious help. > > Marco > > 2017-03-06 11:59 GMT+01:00 Damian Guy : > > > Hi Marco, > > > > Your config etc look ok. > > > > 1. It is pretty hard to tell what is going on from just your code below, > > unfortunately. But the behaviour doesn't seem to be inline with what I'm > > reading in the streams code. For example your MySession::new function > > should be called once per record. The merger and aggregator should be > > called pretty much immediately after that. > > > > 2. Data will be retained for a bit longer than the value used in > > SessionWindows.until(..). The session store has 3 segments and we use the > > retention period (i.e., value of until()) to determine the segment > length. > > The segment length is calculated as: > > > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > > > Which in this case is 21 milliseconds. So maintaining 3 segments > means > > there could be data that is about 10 minutes old. > > > > Also this is completely driven by the data and specifically the time > > extracted from the data. I'm not sure if you can provide a sample of the > > data going through the system? It might be helpful in trying to debug the > > issue. (I'm not seeing anything obvious in the code). > > Also it might help if you can get some stack traces on the streams > > instances that appear to be stuck. > > > > Thanks, > > Damian > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile > > wrote: > > > > > Hello, > > > > > > I'm playing around with the brand new SessionWindows. I have a simple > > > topology such as: > > > > > > KStream sess = > > > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > > > sess > > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > > > .groupByKey(stringSerde,
Re: Strange behaviour in Session Windows
Thanks Damian, sure, you are right, these details are modified to be compliant with my company rules. However the main points are unchanged. The producer of the original data is a "data ingestor" that attach few extra fields and produces a message such as: row = new JsonObject({ "id" : 12345654, "userDeviceId" : "", "creationTime" : 1488801350660 //produced from the remote source "receivedTime": 1488801363455 //placed by my data ingestor, "extra_data1" : 123, // "extra_data2" : 456 // extra data specific for my domain all this data are numbers "extra_data2" : 789 // }) then it sends records into SOURCE_TOPIC (that in this context is USER_ACTIVITIES_TOPIC) as follow: long creationTimestamp = row.getLong("creationTime"); long rowId = row.getLong("id"); ProducerRecordproducerRecord = new ProducerRecord<>(USER_ACTIVITIES_TOPIC, 0, creationTimestamp, rowId, row.toString()); producer.send(producerRecord); Noteworthy: - I'm using only one partition (right now. I'm still not in production and i'm discovering the feature) in production environment I would use more partitions - the message is a simple string containing json object (i'm not using Avro or similar) - in my streaming application: public class MySession{ private final JsonObject sessionDetails; public MySession(){ this.sessionDetails = new JsonObject(); } public static KeyValue enhanceWithUserId_And_PutUserIdAsKey(String k, JsonObject j) { int userId = cache.get(j.get("userDeviceId")); return KeyValue.pair(userId, j); } public static MySession aggregate(String key, JsonObject value, MySession aggregate) { //basically MySession is a collection of all the raw data that the session is composed of aggregate.addRawData(value); return aggregate; } public static MySession merge(String key, MySession arg1, MySession arg2) { arg2.merge(arg1); return arg2; } } BTW (this will be a topic for another thread anyway...) is there a way to be con control of MySession lifecycle? I was thinking to pool them to reduce GC workload. thanks a lot for your precious help. Marco 2017-03-06 11:59 GMT+01:00 Damian Guy : > Hi Marco, > > Your config etc look ok. > > 1. It is pretty hard to tell what is going on from just your code below, > unfortunately. But the behaviour doesn't seem to be inline with what I'm > reading in the streams code. For example your MySession::new function > should be called once per record. The merger and aggregator should be > called pretty much immediately after that. > > 2. Data will be retained for a bit longer than the value used in > SessionWindows.until(..). The session store has 3 segments and we use the > retention period (i.e., value of until()) to determine the segment length. > The segment length is calculated as: > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > Which in this case is 21 milliseconds. So maintaining 3 segments means > there could be data that is about 10 minutes old. > > Also this is completely driven by the data and specifically the time > extracted from the data. I'm not sure if you can provide a sample of the > data going through the system? It might be helpful in trying to debug the > issue. (I'm not seeing anything obvious in the code). > Also it might help if you can get some stack traces on the streams > instances that appear to be stuck. > > Thanks, > Damian > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile > wrote: > > > Hello, > > > > I'm playing around with the brand new SessionWindows. I have a simple > > topology such as: > > > > KStream sess = > > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > > sess > > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > > .groupByKey(stringSerde, jsonSerde) > > .aggregate( > > MySession::new, > > MySession::aggregateSessions, > > MySession::mergeSessions, > > SessionWindows > > .with(WINDOW_INACTIVITY_GAPS_MS) > > .until(WINDOW_MAINTAIN_DURATION_MS), > > .filter(MySession::filterOutZeroLenghtSessions) > > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > > > these are the most important configuration I'm using, all the other > configs > > are the classical serdes and hosts props: > > > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > > 2_MINUTES; > > > > private static final Properties props = new Properties(); > > > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_ > CONFIG, > > ONE_DAY); > > > > The source stream has data arriving at around 100 messages/second > > > > I'm experiencing this behaviours: > > > > 1) MySession::new is called thousands of times, way way more of the > number > > of messages ingested
Re: Strange behaviour in Session Windows
Hi Marco, Can you try setting StreamsConfig.CACHE_MAX_BYTES_BUFFER_CONFIG to 0 and see if that resolves the issue? Thanks, Damian On Mon, 6 Mar 2017 at 10:59 Damian Guywrote: > Hi Marco, > > Your config etc look ok. > > 1. It is pretty hard to tell what is going on from just your code below, > unfortunately. But the behaviour doesn't seem to be inline with what I'm > reading in the streams code. For example your MySession::new function > should be called once per record. The merger and aggregator should be > called pretty much immediately after that. > > 2. Data will be retained for a bit longer than the value used in > SessionWindows.until(..). The session store has 3 segments and we use the > retention period (i.e., value of until()) to determine the segment length. > The segment length is calculated as: > > Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); > > Which in this case is 21 milliseconds. So maintaining 3 segments means > there could be data that is about 10 minutes old. > > Also this is completely driven by the data and specifically the time > extracted from the data. I'm not sure if you can provide a sample of the > data going through the system? It might be helpful in trying to debug the > issue. (I'm not seeing anything obvious in the code). > Also it might help if you can get some stack traces on the streams > instances that appear to be stuck. > > Thanks, > Damian > > On Mon, 6 Mar 2017 at 09:59 Marco Abitabile > wrote: > > Hello, > > I'm playing around with the brand new SessionWindows. I have a simple > topology such as: > > KStream sess = > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > sess > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > .groupByKey(stringSerde, jsonSerde) > .aggregate( > MySession::new, > MySession::aggregateSessions, > MySession::mergeSessions, > SessionWindows > .with(WINDOW_INACTIVITY_GAPS_MS) > .until(WINDOW_MAINTAIN_DURATION_MS), > .filter(MySession::filterOutZeroLenghtSessions) > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > these are the most important configuration I'm using, all the other configs > are the classical serdes and hosts props: > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > 2_MINUTES; > > private static final Properties props = new Properties(); > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, > ONE_DAY); > > The source stream has data arriving at around 100 messages/second > > I'm experiencing this behaviours: > > 1) MySession::new is called thousands of times, way way more of the number > of messages ingested (around 100 / 1000 times more) the most of this > sessions never reach the end of the pipeline (even if I remove > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > MySession::aggregateSessions > and MySession::mergeSessions are invoked. > > Is this correct? I don't understand, maybe I've setup something wrong... > > 2) I can see that the stream pipeline can ingest the first 15 minutes of > data and sessions that reach SINK_TOPIC_KTABLE looks good. However: >- every second that passes the pipeline gets slower and slower and >- I can see new updates to old sessions also after > .until(WINDOW_MAINTAIN_DURATION_MS) > period. >- the stream consumer starts to ingest new data with slower and slower > rates as time passes, eventually reaching almost 0msg/sec > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new > sessions and those that have been fired, will just be removed from session > store and never touched again. > > > At the beginning I was thinking that my pipeline was not setup correctly, > however I've tried to follow slavishly the docs and I could not find where > things can go wrong. > > Do you have some hints about this? > Please let me know if you need more info about. > > thanks a lot, > Marco > >
Re: Strange behaviour in Session Windows
Hi Marco, Your config etc look ok. 1. It is pretty hard to tell what is going on from just your code below, unfortunately. But the behaviour doesn't seem to be inline with what I'm reading in the streams code. For example your MySession::new function should be called once per record. The merger and aggregator should be called pretty much immediately after that. 2. Data will be retained for a bit longer than the value used in SessionWindows.until(..). The session store has 3 segments and we use the retention period (i.e., value of until()) to determine the segment length. The segment length is calculated as: Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL); Which in this case is 21 milliseconds. So maintaining 3 segments means there could be data that is about 10 minutes old. Also this is completely driven by the data and specifically the time extracted from the data. I'm not sure if you can provide a sample of the data going through the system? It might be helpful in trying to debug the issue. (I'm not seeing anything obvious in the code). Also it might help if you can get some stack traces on the streams instances that appear to be stuck. Thanks, Damian On Mon, 6 Mar 2017 at 09:59 Marco Abitabilewrote: > Hello, > > I'm playing around with the brand new SessionWindows. I have a simple > topology such as: > > KStream sess = > builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); > sess > .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) > .groupByKey(stringSerde, jsonSerde) > .aggregate( > MySession::new, > MySession::aggregateSessions, > MySession::mergeSessions, > SessionWindows > .with(WINDOW_INACTIVITY_GAPS_MS) > .until(WINDOW_MAINTAIN_DURATION_MS), > .filter(MySession::filterOutZeroLenghtSessions) > .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); > > these are the most important configuration I'm using, all the other configs > are the classical serdes and hosts props: > > private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES > private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + > 2_MINUTES; > > private static final Properties props = new Properties(); > > props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, > ONE_DAY); > > The source stream has data arriving at around 100 messages/second > > I'm experiencing this behaviours: > > 1) MySession::new is called thousands of times, way way more of the number > of messages ingested (around 100 / 1000 times more) the most of this > sessions never reach the end of the pipeline (even if I remove > .filter(MySession::filterOutZeroLenghtSessions) ) and nor > MySession::aggregateSessions > and MySession::mergeSessions are invoked. > > Is this correct? I don't understand, maybe I've setup something wrong... > > 2) I can see that the stream pipeline can ingest the first 15 minutes of > data and sessions that reach SINK_TOPIC_KTABLE looks good. However: >- every second that passes the pipeline gets slower and slower and >- I can see new updates to old sessions also after > .until(WINDOW_MAINTAIN_DURATION_MS) > period. >- the stream consumer starts to ingest new data with slower and slower > rates as time passes, eventually reaching almost 0msg/sec > > I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new > sessions and those that have been fired, will just be removed from session > store and never touched again. > > > At the beginning I was thinking that my pipeline was not setup correctly, > however I've tried to follow slavishly the docs and I could not find where > things can go wrong. > > Do you have some hints about this? > Please let me know if you need more info about. > > thanks a lot, > Marco >
Strange behaviour in Session Windows
Hello, I'm playing around with the brand new SessionWindows. I have a simple topology such as: KStreamsess = builder.stream(stringSerde, jsonSerde, SOURCE_TOPIC); sess .map(MySession::enhanceWithUserId_And_PutUserIdAsKey) .groupByKey(stringSerde, jsonSerde) .aggregate( MySession::new, MySession::aggregateSessions, MySession::mergeSessions, SessionWindows .with(WINDOW_INACTIVITY_GAPS_MS) .until(WINDOW_MAINTAIN_DURATION_MS), .filter(MySession::filterOutZeroLenghtSessions) .to(windowedSerde, mySessionSerde, SINK_TOPIC_KTABLE); these are the most important configuration I'm using, all the other configs are the classical serdes and hosts props: private static final String WINDOW_INACTIVITY_GAPS_MS = 5_MINUTES private static final String WINDOW_MAINTAIN_DURATION_MS = 5_MINUTES + 2_MINUTES; private static final Properties props = new Properties(); props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, ONE_DAY); The source stream has data arriving at around 100 messages/second I'm experiencing this behaviours: 1) MySession::new is called thousands of times, way way more of the number of messages ingested (around 100 / 1000 times more) the most of this sessions never reach the end of the pipeline (even if I remove .filter(MySession::filterOutZeroLenghtSessions) ) and nor MySession::aggregateSessions and MySession::mergeSessions are invoked. Is this correct? I don't understand, maybe I've setup something wrong... 2) I can see that the stream pipeline can ingest the first 15 minutes of data and sessions that reach SINK_TOPIC_KTABLE looks good. However: - every second that passes the pipeline gets slower and slower and - I can see new updates to old sessions also after .until(WINDOW_MAINTAIN_DURATION_MS) period. - the stream consumer starts to ingest new data with slower and slower rates as time passes, eventually reaching almost 0msg/sec I was expecting that after WINDOW_MAINTAIN_DURATION_MS i can see only new sessions and those that have been fired, will just be removed from session store and never touched again. At the beginning I was thinking that my pipeline was not setup correctly, however I've tried to follow slavishly the docs and I could not find where things can go wrong. Do you have some hints about this? Please let me know if you need more info about. thanks a lot, Marco