Hi team, We want to try to use state processor APIs[1] to clean up some legacy states.
Here are our steps: 1. Create a new savepoint (~= 1.5TB) 2. Submit state processor jobs 3. Write results to a new savepoint We create 8 task managers with 120 slots to execute it. Here are the related configurations ``` kubernetes.pod-template-file.taskmanager: /srv/pod-template-taskmanager.yml kubernetes.taskmanager.cpu: 15 taskmanager.memory.jvm-metaspace.size: 512m taskmanager.memory.managed.fraction: 0.7 taskmanager.memory.process.size: 52g taskmanager.memory.task.off-heap.size: 1g # For queryable state taskmanager.numberOfTaskSlots: 15 taskmanager.rpc.port: 6122 ``` And the state processor execution logics are below. ``` def execute(savePointPath: String, newSavePointPath: String, config: Config) = { val env = ExecutionEnvironment.getExecutionEnvironment val state = Savepoint .load(env, savePointPath, new EmbeddedRocksDBStateBackend) .readKeyedState( "read-key-state-id", new CustomKeyStateReadFunction(config), createTypeInformation[CustomKey], createTypeInformation[CustomKeyState], ) val transformation = OperatorTransformation .bootstrapWith(state) .keyBy((x: CustomKeyState) => x.key) .transform(new CustomKeyStateBootstrapFunction(config)) Savepoint .create(new EmbeddedRocksDBStateBackend(true), MAX_PARALLELISM) .withOperator("operator_id", transformation) .write(newSavePointPath) env.execute() } ``` However, seems like the state process is very slow(13 Mb/min) and the resource usage is very low. (CPU: 10-15% / Memory: 50-60%) Are there any tips to speed up the process? Thanks! -- *Regards,* *Oscar*