Hi,
One way to keep the state size under control would be:
1) attach for every incoming edge it's "insertion time" into the vertex
function's state.
2) in addition, the vertex function would send a delayed message, with a
delay of insertion time + expiration duration
3) once a delayed message arrives, iterate over your edge state and remove
all the edges with "insertion time" <= now()

To reduce the number of delayed messages, you can make sure to send a
single delayed message once a fixed expiration interval
(a.k.a. tumbling window).

A better way to deal with that would be to wait until [1] would be
implemented in StateFun (I don't believe it should take more than couple of
weeks)
Then you can simply define your state with expiration and StateFun would
make sure that the edge state would be purged automatically some configured
time
after insertion.

I hope this helps,
Good luck!
Igal.


[1] https://issues.apache.org/jira/browse/FLINK-17644

On Fri, May 8, 2020 at 1:00 PM m@xi <makisnt...@gmail.com> wrote:

> Dear Igal, Very insightful answer. Thanks.
>
> Igal Shilman wrote
> An alternative approach would be to implement a *thumbling window* per
> vertex (a stateful function instance) by sending to itself a delayed
> message [2]. When that specific delayed message arrives you would have to
> purge the oldest edges by examining the edges in state.
>
> Indeed, the delayed asynchronous messages are a workaround for *tumbling
> window* simulation in SF. I believe you assume a message received by a
> stateful function contains multiple edges, i.e. which can all be delayed by
> a certain amount of time. Therefore, when a function receives a message, it
> purges all of its existing edges and incorporates the new (delayed) ones.
> Correct? Nevertheless, if you think of it, the delay is essentially the 
> *window
> slide*. Now, what about the *window range*?
>
> Igal Shilman wrote
> Data stream windows are not yet supported in statefun, but it seems like
> the main motivation here is to purge old edges? If this is the case perhaps
> we need to integrate state TTL [1] into persisted values/persistedtables.
>
> I was not aware about the TTL, very interesting and handful. Essentially,
> the TTL can enforce the *window range* i.e., attach to each tuple
> received by a stateful function its lifespan/duration. So, the first TTL
> attribute sets the range *StateTtlConfig.newBuilder(Time.seconds(window
> range))*. Therefore, by *combining TTL and SF Delayed Messaging* we can 
> *simulate
> sliding window* processing on a stateful function basis. However, TTL is
> a Flink constuct and I am not sure if I got it correctly. You said
>
> Igal Shilman wrote
> If this is the case perhaps *we need to integrate* state TTL [1] into
> persisted values/persistedtables.
>
> If this is the case, then I believe it would be great to integrate TLL
> into Persisted Values/Tables
> <https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/sdk/java.html#persistence>.
>
> ------------------------------
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to