Hi Flink dev community!

I'm adding an event delay function for certain events using Flink timers,
but occasionally the ValueState seems to lose the event. I see a ValueState
loss of ~20 events out of 1250, and I don't see a pattern to it yet.

The code is below:
public class ProjectionEvictionEventDelayFunction extends
KeyedProcessFunction<String, JsonNode, JsonNode> {
/**
* ProjectionEvictionEventDelayFunction handles delay projection eviction
events by a configured amount
* of time using Flink's onTimer().
**/

private final static Logger logger = LogManager.getLogger(
ProjectionEvictionEventDelayFunction.class.getName());
private ValueState<String> originalEventState;

private final ObjectMapper mapper = new ObjectMapper();

@Override
public void open(Configuration config) {
ValueStateDescriptor<String> originalEventDescriptor = new
ValueStateDescriptor<String>(
"originalEventState",
Types.STRING);
originalEventState = getRuntimeContext().getState(originalEventDescriptor);
}

@Override
public void processElement(JsonNode event, Context context, Collector<
JsonNode> collector) throws Exception {
if (GantryEventUtils.isEvictionEvent(event)) {
// If it is the eviction event, add a timer with a delay.
originalEventState.update(mapper.writeValueAsString(event));
// TODO: Remove comment
logger.info("Storing eviction event" + event.get(GantryEventField.EVENT_ID.
getFieldName()).asText());
context.timerService().registerProcessingTimeTimer(context.timestamp() +
10000); // 10 seconds.
} else {
// If it is not the eviction event, just send it through.
// TODO: Remove comment
logger.info("Emitting non eviction event" + event.get(GantryEventField.
EVENT_ID.getFieldName()).asText());
collector.collect(event);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<
JsonNode> collector) throws Exception {
String event = originalEventState.value();
try {
// Retrieve the original event from the ValueState
JsonNode originalEvent = mapper.readTree(event);

// Clear the original event state for this key
originalEventState.clear();

// Emit the original event with a timestamp
collector.collect(originalEvent);
} catch (Exception e) {
logger.error("Failed to emit eviction event " + event + context.
getCurrentKey() + timestamp);
}
}
}


Thanks,
Kireet

Reply via email to