Hi Gerard,

I had a look at your Trigger implementation but did not spot something
suspicious that would cause the state size to grow.
However, I notices a few things that can be improved:

- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to
make the Trigger easier to test (there some test harnesses that can set the
processing time manually)
- timers are not overwritten, so each timeout timer will yield a callback
to onProcessingTime(). It is not possible to delete timers (so you cannot
prevent the onProcessingTime() method to be called multiple times), but you
can save the most recent timer timestamps as ValueState[Long] and compare
against the state to only act on the last timer call.
- You can get the state objects just once and apply multiple operations on
the state object, i.e.,

var elementsToReceive = ctx.getPartitionedState(elementsToReceiveDesc)
var elementsReceived = ctx.getPartitionedState(elementsReceivedDesc)

elementsToReceive.update(x)
val cnt: Int = elementsToReceive.get()
...

Maybe Aljoscha can check the code as well and see if he finds the reason
why the state grows.

Best, Fabian

2017-09-18 15:27 GMT+02:00 gerardg <ger...@talaia.io>:

> I may be able to better know what is happening if I could get what is being
> stored in the state. Is there any way to read the RocksDB db state?
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Reply via email to