[ 
https://issues.apache.org/jira/browse/FLINK-21308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279870#comment-17279870
 ] 

Stephan Pelikan commented on FLINK-21308:
-----------------------------------------

I've created a draft: 
[https://github.com/apache/flink-statefun/compare/release-2.2.1...stephanpelikan:feature/FLINK-21308_unsendAfter?expand=1]

It is based on release-2.2.1, the version I'm using at the moment. I guess it 
can be upgraded easily ;).

The base idea is to
 # generate a message id
 # store the message id instead of the message in the list of delayed messages 
for a certain timestamp
 # store the message in a map (key = message id)

Doing so the message can be found and deleted on demand.

The implemenation is downwardly compatible because the list of messages for a 
certain timestamp can hold either messages (from previous 
checkpoints/savepoints) or strings (message ids). In the major release after 
the next major release this can be removed. One downside of that compatiblity 
is that adding messages is nearly as fast as before but deleting messages 
requires to rewrite the entire list of message ids for a certain timestamp (see 
[https://github.com/apache/flink-statefun/compare/release-2.2.1...stephanpelikan:feature/FLINK-21308_unsendAfter?expand=1#diff-0244beb47e964409e2fa75e0f74a4197c01256d2aefe4b4dbba91bb1aadf259bR103)].
 For my use case the most timers will be removed and I would prefer a solution 
which is also fast on removing, but on the other hand it is not likely to have 
huge lists of messages for a certain timestamp and this approach is not as 
expensive as it might seem.

For constructing the ids I used a time and location based UUID-generator. I 
used the location-part as namespace for the map and the time-part as internal 
id instead of the original UUID to save storage. The user will receive the 
entire UUID. Using UUIDs is just a suggestion and is originated by the fact 
that I don't know much about other possibilities within Flink.
On hint: Using the location-part which is based on the systems ethernet-address 
as the namespace should hopefully force that removing messages will be done in 
the map-state of the same machine as inserting. I hope this assumption is 
correct.

I named the Context-method for that "unsendAfter". Feel free to suggest a 
better name.

Last thing to mention: I did not test this yet. I will do this afterwards but I 
wanted to share my proposal as soon as possible. So you have the possibility to 
give me feedback before I have to invest much effort in testing something which 
might not be used. Sorry for that, my time is limited :(.

 

> Cancel "sendAfter"
> ------------------
>
>                 Key: FLINK-21308
>                 URL: https://issues.apache.org/jira/browse/FLINK-21308
>             Project: Flink
>          Issue Type: New Feature
>          Components: Stateful Functions
>            Reporter: Stephan Pelikan
>            Priority: Major
>
> As a user I want to cancel delayed sent messages not needed any more to keep 
> state clean.
> Use case:
> {quote}My use-case is processing business events of customers. Those events 
> are triggered by ourself or by the customer depending of what's the current 
> state of the ongoing customer's business use-case. We need to monitor 
> delayed/missing business events which belong to previous events. For example: 
> the customer has to confirm something we did. Depending on what it is the 
> confirmation has to be within hours, days or even months. If there is a delay 
> we need to know. But if the customer confirms in time we want to cleanup to 
> keep the state small.
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to