Hi Gerard, I had a look at your Trigger implementation but did not spot something suspicious that would cause the state size to grow. However, I notices a few things that can be improved:
- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to make the Trigger easier to test (there some test harnesses that can set the processing time manually) - timers are not overwritten, so each timeout timer will yield a callback to onProcessingTime(). It is not possible to delete timers (so you cannot prevent the onProcessingTime() method to be called multiple times), but you can save the most recent timer timestamps as ValueState[Long] and compare against the state to only act on the last timer call. - You can get the state objects just once and apply multiple operations on the state object, i.e., var elementsToReceive = ctx.getPartitionedState(elementsToReceiveDesc) var elementsReceived = ctx.getPartitionedState(elementsReceivedDesc) elementsToReceive.update(x) val cnt: Int = elementsToReceive.get() ... Maybe Aljoscha can check the code as well and see if he finds the reason why the state grows. Best, Fabian 2017-09-18 15:27 GMT+02:00 gerardg <ger...@talaia.io>: > I may be able to better know what is happening if I could get what is being > stored in the state. Is there any way to read the RocksDB db state? > > Gerard > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ >