Renxiang Zhou created FLINK-37685:
-------------------------------------
Summary: process the callback immediately when the processing key
equals to the key in record context
Key: FLINK-37685
URL: https://issues.apache.org/jira/browse/FLINK-37685
Project: Flink
Issue Type: Improvement
Components: Runtime / Async State Processing
Affects Versions: 2.0.0
Environment: Currently, the implementation of asyncProcessWithKey
needs to create a new record context for each call. When the processing key
equals to the current record context key, we can directly process the Callback
to improve processing efficiency.
{code:java}
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception>
processing) {
RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();
// if the current key is equals to the old context key, we just process it
with sync point.
if (oldContext != null && oldContext.getKey() != null &&
oldContext.getKey().equals(key)) {
asyncExecutionController.syncPointRequestWithCallback(processing, true);
return;
}
// build a context and switch to the new context
RecordContext<K> newContext = asyncExecutionController.buildContext(null,
key, true);
newContext.retain();
asyncExecutionController.setCurrentContext(newContext);
// Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when
the call's key
// pass the same key in.
asyncExecutionController.syncPointRequestWithCallback(processing, true);
newContext.release();
// switch to original context
asyncExecutionController.setCurrentContext(oldContext);
} {code}
Reporter: Renxiang Zhou
--
This message was sent by Atlassian Jira
(v8.20.10#820010)