[ https://issues.apache.org/jira/browse/FLINK-21308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17279870#comment-17279870 ]
Stephan Pelikan commented on FLINK-21308: ----------------------------------------- I've created a draft: [https://github.com/apache/flink-statefun/compare/release-2.2.1...stephanpelikan:feature/FLINK-21308_unsendAfter?expand=1] It is based on release-2.2.1, the version I'm using at the moment. I guess it can be upgraded easily ;). The base idea is to # generate a message id # store the message id instead of the message in the list of delayed messages for a certain timestamp # store the message in a map (key = message id) Doing so the message can be found and deleted on demand. The implemenation is downwardly compatible because the list of messages for a certain timestamp can hold either messages (from previous checkpoints/savepoints) or strings (message ids). In the major release after the next major release this can be removed. One downside of that compatiblity is that adding messages is nearly as fast as before but deleting messages requires to rewrite the entire list of message ids for a certain timestamp (see [https://github.com/apache/flink-statefun/compare/release-2.2.1...stephanpelikan:feature/FLINK-21308_unsendAfter?expand=1#diff-0244beb47e964409e2fa75e0f74a4197c01256d2aefe4b4dbba91bb1aadf259bR103)]. For my use case the most timers will be removed and I would prefer a solution which is also fast on removing, but on the other hand it is not likely to have huge lists of messages for a certain timestamp and this approach is not as expensive as it might seem. For constructing the ids I used a time and location based UUID-generator. I used the location-part as namespace for the map and the time-part as internal id instead of the original UUID to save storage. The user will receive the entire UUID. Using UUIDs is just a suggestion and is originated by the fact that I don't know much about other possibilities within Flink. On hint: Using the location-part which is based on the systems ethernet-address as the namespace should hopefully force that removing messages will be done in the map-state of the same machine as inserting. I hope this assumption is correct. I named the Context-method for that "unsendAfter". Feel free to suggest a better name. Last thing to mention: I did not test this yet. I will do this afterwards but I wanted to share my proposal as soon as possible. So you have the possibility to give me feedback before I have to invest much effort in testing something which might not be used. Sorry for that, my time is limited :(. > Cancel "sendAfter" > ------------------ > > Key: FLINK-21308 > URL: https://issues.apache.org/jira/browse/FLINK-21308 > Project: Flink > Issue Type: New Feature > Components: Stateful Functions > Reporter: Stephan Pelikan > Priority: Major > > As a user I want to cancel delayed sent messages not needed any more to keep > state clean. > Use case: > {quote}My use-case is processing business events of customers. Those events > are triggered by ourself or by the customer depending of what's the current > state of the ongoing customer's business use-case. We need to monitor > delayed/missing business events which belong to previous events. For example: > the customer has to confirm something we did. Depending on what it is the > confirmation has to be within hours, days or even months. If there is a delay > we need to know. But if the customer confirms in time we want to cleanup to > keep the state small. > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)