Hey all,

I'm working on migrating our Flink job away from Hadoop session mode to K8S
application mode.
It's been going great so far but I'm hitting a wall with this seemingly
simple thing.

In the first phase of the migration I want to remove some operators (their
state can be discarded) and focus on getting the primary pipeline running
first.
For that I have to start the cluster from a savepoint with the
"allowNonRestoredState" parameter turned on.

The problem is that I can't set it in any way that I'm aware of. I tried 4
ways separately and simultaneously:

1) Adding --allowNonRestoredState to flink run-application
-t kubernetes-application
2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
run-application -t kubernetes-application
3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
flink-conf.yaml where I'm running flink run-application
4) Overriding it in the application code:
    val sigh = new Configuration()
    sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
true)
    env.configure(sigh)

Every time the resulting pod ends up with "false" value for this setting in
its configmap:
$ kc describe cm/flink-config-flink-test | grep ignore
execution.savepoint.ignore-unclaimed-state: false

And I get the exception:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint
<URL>. Cannot map checkpoint/savepoint state for operator
68895e9129981bfc6d96d1dad715298e to the new program, because the operator
is not available in the new program. If you want to allow to skip this, you
can set the --allowNonRestoredState option on the CLI.

It seems like something overrides it to false and it never has any effect.

Can this be a bug or am I doing something wrong?

For context, the savepoint is produced by Flink 1.8.2 and the version I'm
trying to run on K8S is 1.14.3.

-- 
With regards,
Andrey Bulgakov

Reply via email to