vinoyang created FLINK-14428: -------------------------------- Summary: Non-consistency key access in KeyedProcessFunction when use keyed state in both processElement and onTimer method Key: FLINK-14428 URL: https://issues.apache.org/jira/browse/FLINK-14428 Project: Flink Issue Type: Bug Components: API / DataStream Reporter: vinoyang
Scenario: In {{KeyedProcessFunction}}, uses keyed state API in both {{processElement}} and {{onTimer}} method may cause non-consistency key access. Analysis: For timer, in {{InternalTimerServiceImpl}}, the key context is set to the key which comes from timer when calling registerXXXTimeTimer: {code:java} public void onProcessingTime(long time) throws Exception { // null out the timer in case the Triggerable calls registerProcessingTimeTimer() // inside the callback. nextTimer = null; InternalTimer<K, N> timer; while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { processingTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); //here triggerTarget.onProcessingTime(timer); } if (timer != null && nextTimer == null) { nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this); } } {code} For processElement method, in {{OneInputStreamTask}} it is called after seting key context: {code:java} @Override public void emitRecord(StreamRecord<IN> record) throws Exception { synchronized (lock) { numRecordsIn.inc(); operator.setKeyContextElement1(record); //here operator.processElement(record); } } {code} The setCurrentKey method in the first code snippet and the setKeyContextElement1 method in the second code snippet are point to the same {{AbstractStreamOperator#setCurrentKey}} method. However, there is only one keyed State Backend instance. And {{AbstractStreamOperator#setCurrentKey}} will change the current key of keyed state backend. So if we access keyed state API in both {{processElement}} and {{onTimer}}, we may get error state value, because one of these methods may change the key and caused non-consistency problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)