Hi,
I'm trying to upgrade an application from Flink 1.14 + using the new
KafkaSink while restoring from a checkpoint.

I changed the UID of the KafkaSink completely and ran the application with
--allow-non-restored-state. However, when restoring, I still run into the
following error:

Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for
OutputConversionOperator_e0941bd9a0231c2ea1e2ceec25bf6fcd_(3/3) from any of
the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174)
... 11 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:482)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.lang.IllegalArgumentException: Illegal Capacity: -1
at java.base/java.util.ArrayList.<init>(ArrayList.java:158)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.deserialize(TwoPhaseCommitSinkFunction.java:776)
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.deserialize(TwoPhaseCommitSinkFunction.java:672)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
... 17 more

The StateSerializer being created is the old one for the
FlinkKafkaProducer. Is there any other way to work around this issue?
-- 
Best Regards,
Yuval Itzchakov.

Reply via email to