Hello,
I'm using Flink 1.17.1 and I have stateTTL enabled in one of my Flink jobs
where I'm using the RocksDB for checkpointing. I have a value state of Pojo
class (which is generated from Avro schema). I added a new field to my schema
along with the default value to make sure it is backwards compatible, however
when I redeployed the job, I got StateMigrationException. I have similar setup
with other Flink jobs where adding a column doesn't cause any trouble.
This is my stateTTL config:
StateTtlConfig
.newBuilder(Time.days(7))
.cleanupInRocksdbCompactFilter(1000)
.build
This is how I enable it:
val myStateDescriptor: ValueStateDescriptor[MyPojoClass] =
new ValueStateDescriptor[MyPojoClass](
"test-name",
classOf[MyPojoClass])
myStateDescriptor.enableTimeToLive(initStateTTLConfig())
This is the exception I end up with:
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51) must
not be incompatible with the old state serializer
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51).
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:222)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:145)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:129)
at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:69)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 25 more
Does anyone know what is causing the issue?
Cheers,
Irakli