Hi Flink dev community! I'm adding an event delay function for certain events using Flink timers, but occasionally the ValueState seems to lose the event. I see a ValueState loss of ~20 events out of 1250, and I don't see a pattern to it yet.
The code is below: public class ProjectionEvictionEventDelayFunction extends KeyedProcessFunction<String, JsonNode, JsonNode> { /** * ProjectionEvictionEventDelayFunction handles delay projection eviction events by a configured amount * of time using Flink's onTimer(). **/ private final static Logger logger = LogManager.getLogger( ProjectionEvictionEventDelayFunction.class.getName()); private ValueState<String> originalEventState; private final ObjectMapper mapper = new ObjectMapper(); @Override public void open(Configuration config) { ValueStateDescriptor<String> originalEventDescriptor = new ValueStateDescriptor<String>( "originalEventState", Types.STRING); originalEventState = getRuntimeContext().getState(originalEventDescriptor); } @Override public void processElement(JsonNode event, Context context, Collector< JsonNode> collector) throws Exception { if (GantryEventUtils.isEvictionEvent(event)) { // If it is the eviction event, add a timer with a delay. originalEventState.update(mapper.writeValueAsString(event)); // TODO: Remove comment logger.info("Storing eviction event" + event.get(GantryEventField.EVENT_ID. getFieldName()).asText()); context.timerService().registerProcessingTimeTimer(context.timestamp() + 10000); // 10 seconds. } else { // If it is not the eviction event, just send it through. // TODO: Remove comment logger.info("Emitting non eviction event" + event.get(GantryEventField. EVENT_ID.getFieldName()).asText()); collector.collect(event); } } @Override public void onTimer(long timestamp, OnTimerContext context, Collector< JsonNode> collector) throws Exception { String event = originalEventState.value(); try { // Retrieve the original event from the ValueState JsonNode originalEvent = mapper.readTree(event); // Clear the original event state for this key originalEventState.clear(); // Emit the original event with a timestamp collector.collect(originalEvent); } catch (Exception e) { logger.error("Failed to emit eviction event " + event + context. getCurrentKey() + timestamp); } } } Thanks, Kireet