[
https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17946699#comment-17946699
]
Renxiang Zhou commented on FLINK-37685:
---------------------------------------
[~zakelly]
Get it, thanks for pointing it out.(y)
we add some unit tests in our internal version, and some unit tests are failed
since we use asyncProcessWithKey in a wrong way.
> process the callback immediately when the processing key equals to the key in
> record context
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-37685
> URL: https://issues.apache.org/jira/browse/FLINK-37685
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Async State Processing
> Affects Versions: 2.0.0
> Reporter: Renxiang Zhou
> Priority: Major
>
> Currently, the implementation of asyncProcessWithKey needs to create a new
> record context for each call. When the processing key equals to the current
> record context key, we can directly process the Callback to improve
> processing efficiency.
>
> {code:java}
> public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception>
> processing) {
> RecordContext<K> oldContext =
> asyncExecutionController.getCurrentContext();
> -----------
> // if the current key is equals to the old context key, we just process
> it with sync point.
> if (oldContext != null && oldContext.getKey() != null &&
> oldContext.getKey().equals(key)) {
> asyncExecutionController.syncPointRequestWithCallback(processing,
> true);
> return;
> }
> -----------
> // build a context and switch to the new context
> RecordContext<K> newContext = asyncExecutionController.buildContext(null,
> key, true);
> newContext.retain();
> asyncExecutionController.setCurrentContext(newContext);
> // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic
> when the call's key
> // pass the same key in.
> asyncExecutionController.syncPointRequestWithCallback(processing, true);
> newContext.release();
> // switch to original context
> asyncExecutionController.setCurrentContext(oldContext);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)