Hi team,

We want to try to use state processor APIs[1] to clean up some legacy
states.

Here are our steps:
1. Create a new savepoint (~= 1.5TB)
2. Submit state processor jobs
3. Write results to a new savepoint

We create 8 task managers with 120 slots to execute it.
Here are the related configurations
```
kubernetes.pod-template-file.taskmanager: /srv/pod-template-taskmanager.yml
kubernetes.taskmanager.cpu: 15
taskmanager.memory.jvm-metaspace.size: 512m
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.process.size: 52g
taskmanager.memory.task.off-heap.size: 1g # For queryable state
taskmanager.numberOfTaskSlots: 15
taskmanager.rpc.port: 6122
```

And the state processor execution logics are below.
```
  def execute(savePointPath: String, newSavePointPath: String, config:
Config) = {
    val env = ExecutionEnvironment.getExecutionEnvironment

    val state = Savepoint
      .load(env, savePointPath, new EmbeddedRocksDBStateBackend)
      .readKeyedState(
        "read-key-state-id",
        new CustomKeyStateReadFunction(config),
        createTypeInformation[CustomKey],
        createTypeInformation[CustomKeyState],
      )

    val transformation = OperatorTransformation
      .bootstrapWith(state)
      .keyBy((x: CustomKeyState) => x.key)
      .transform(new CustomKeyStateBootstrapFunction(config))

    Savepoint
      .create(new EmbeddedRocksDBStateBackend(true), MAX_PARALLELISM)
      .withOperator("operator_id", transformation)
      .write(newSavePointPath)

    env.execute()
  }
```

However, seems like the state process is very slow(13 Mb/min) and the
resource usage is very low. (CPU: 10-15% / Memory: 50-60%)

Are there any tips to speed up the process? Thanks!

-- 
*Regards,*
*Oscar*

Reply via email to