wuzhiyu created FLINK-25400:
-------------------------------

             Summary: RocksDBStateBackend configurations does not work with 
SavepointEnvironment
                 Key: FLINK-25400
                 URL: https://issues.apache.org/jira/browse/FLINK-25400
             Project: Flink
          Issue Type: Bug
          Components: API / State Processor
    Affects Versions: 1.12.2
            Reporter: wuzhiyu


Hi~

I'm trying to use flink-state-processor-api to do state migrations by reading 
states from an existing savepoint, and writing them into a new savepoint after 
certain transformations.

However, the reading rate does not  meet my expectation.

When I tried to tune RocksDB by enabling RocksDB native metrics, I found it did 
not work.

So I did some debug, I found when the job is running under a 
SavepointEnvironment, no RocksDBStatebackend configurations will be passed to 
RocksDBStateBackend.

The whole process is described as below (code demonstrated is under version 
release-1.12.2):

First, when 
org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend is 
invoked:

 
{code:java}
// org.apache.flink.streaming.runtime.tasks.StreamTask#createStateBackend
private StateBackend createStateBackend() throws Exception {
    final StateBackend fromApplication =
            configuration.getStateBackend(getUserCodeClassLoader());

    return StateBackendLoader.fromApplicationOrConfigOrDefault(
            fromApplication,
            getEnvironment().getTaskManagerInfo().getConfiguration(),
            getUserCodeClassLoader(),
            LOG); {code}
*getEnvironment()* returns a SavepointEnvironment instance.

 

And then 
*org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo* is 
invoked, it returns a new 
*org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo* instance.

 
{code:java}
// org.apache.flink.state.api.runtime.SavepointEnvironment#getTaskManagerInfo
@Override
public TaskManagerRuntimeInfo getTaskManagerInfo() {
    return new SavepointTaskManagerRuntimeInfo(getIOManager());
} {code}
 

At last, 
*org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration*
 is invoked. It returns an empty configuration, which means all configurations 
will be lost.
{code:java}
// 
org.apache.flink.state.api.runtime.SavepointTaskManagerRuntimeInfo#getConfiguration
@Override
public Configuration getConfiguration() {
    return new Configuration();
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to