da-daken opened a new issue, #691:
URL: https://github.com/apache/flink-agents/issues/691

   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   ### Problem
   ActionState is persisted to the external ActionStateStore to support durable 
execution recovery. The current production store implementation is Kafka.
   When a checkpoint completes
   ```java
   actionStateStore.pruneState(entry.getKey(), entry.getValue());
   ```
   The intent is to prune action states for a Flink key up to the completed 
sequence number. However, the current KafkaActionStateStore.pruneState(...) 
implementation only removes matching entries from the in-memory actionStates 
cache. It does not delete the corresponding records from the Kafka topic. These 
historical keys are no longer useful for recovery once the completed sequence 
number has been covered by a successful checkpoint, but they can still 
accumulate indefinitely in Kafka.
   ### Solution
   After a checkpoint has completed,should delete from kafkaActionStateStore. 
Because the Kafka topic is configured with cleanup.policy=compact, writing a 
tombstone record (key = stateKey, value = null) allows Kafka log compaction to 
eventually remove all historical records for that ActionState key.
   ```java
   producer.send(new ProducerRecord<>(topic, stateKey, null));
   ```
   
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to