[
https://issues.apache.org/jira/browse/FLINK-37685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Renxiang Zhou updated FLINK-37685:
----------------------------------
Description:
Currently, the implementation of asyncProcessWithKey needs to create a new
record context for each call. When the processing key equals to the current
record context key, we can directly process the Callback to improve processing
efficiency.
{code:java}
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception>
processing) {
RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();
// if the current key is equals to the old context key, we just process it
with sync point.
if (oldContext != null && oldContext.getKey() != null &&
oldContext.getKey().equals(key)) {
asyncExecutionController.syncPointRequestWithCallback(processing, true);
return;
}
// build a context and switch to the new context
RecordContext<K> newContext = asyncExecutionController.buildContext(null,
key, true);
newContext.retain();
asyncExecutionController.setCurrentContext(newContext);
// Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when
the call's key
// pass the same key in.
asyncExecutionController.syncPointRequestWithCallback(processing, true);
newContext.release();
// switch to original context
asyncExecutionController.setCurrentContext(oldContext);
} {code}
Environment: (was: Currently, the implementation of asyncProcessWithKey
needs to create a new record context for each call. When the processing key
equals to the current record context key, we can directly process the Callback
to improve processing efficiency.
{code:java}
public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception>
processing) {
RecordContext<K> oldContext = asyncExecutionController.getCurrentContext();
// if the current key is equals to the old context key, we just process it
with sync point.
if (oldContext != null && oldContext.getKey() != null &&
oldContext.getKey().equals(key)) {
asyncExecutionController.syncPointRequestWithCallback(processing, true);
return;
}
// build a context and switch to the new context
RecordContext<K> newContext = asyncExecutionController.buildContext(null,
key, true);
newContext.retain();
asyncExecutionController.setCurrentContext(newContext);
// Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic when
the call's key
// pass the same key in.
asyncExecutionController.syncPointRequestWithCallback(processing, true);
newContext.release();
// switch to original context
asyncExecutionController.setCurrentContext(oldContext);
} {code}
)
> process the callback immediately when the processing key equals to the key in
> record context
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-37685
> URL: https://issues.apache.org/jira/browse/FLINK-37685
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Async State Processing
> Affects Versions: 2.0.0
> Reporter: Renxiang Zhou
> Priority: Major
>
> Currently, the implementation of asyncProcessWithKey needs to create a new
> record context for each call. When the processing key equals to the current
> record context key, we can directly process the Callback to improve
> processing efficiency.
>
> {code:java}
> public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception>
> processing) {
> RecordContext<K> oldContext =
> asyncExecutionController.getCurrentContext();
> // if the current key is equals to the old context key, we just process
> it with sync point.
> if (oldContext != null && oldContext.getKey() != null &&
> oldContext.getKey().equals(key)) {
> asyncExecutionController.syncPointRequestWithCallback(processing,
> true);
> return;
> }
> // build a context and switch to the new context
> RecordContext<K> newContext = asyncExecutionController.buildContext(null,
> key, true);
> newContext.retain();
> asyncExecutionController.setCurrentContext(newContext);
> // Same logic as RECORD_ORDER, since FIRST_STATE_ORDER is problematic
> when the call's key
> // pass the same key in.
> asyncExecutionController.syncPointRequestWithCallback(processing, true);
> newContext.release();
> // switch to original context
> asyncExecutionController.setCurrentContext(oldContext);
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)