We have encountered a rather rare, but very nasty bug with Flink related
to serialization of Pojos in keystate.
-- Timeline --
1) Write a specific item to keystate of class C at Time1, no read of that
key will happen until step 5.
2) Time elapses
3) class C is schema evolved to include an additional field
4) Time elapses
5) When reading the specific item written above, we get a EOFException
being thrown to AbstractRocksDBState.migrateSerializedValue
6) Reading the item puts Flink into a restart loop of death. Manual
intervention is required.
-- details at the time of writing the value to keystate --
class C { // at Time1
private String fieldAA = "AA";
private String fieldBB = "BB";
}
The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 03 42 42 is_null, len+1,
'B', 'B'
Serialized Field list is: [fieldAA, fieldBB]
-- schema evolution --
class C { // at Time3
private String fieldAA = "AA";
private Integer fieldAB = -1;
private String fieldBB = "BB";
}
-- details at the time of reading the value from keystate --
The serialized buffer looks like so:
02 flag 00 03 41 41 is_null, len+1, 'A', 'A' 00 ff ff ff ff is_null, -1 00
03 42 42 is_null, len+1, 'B', 'B'
Serialized Field list is: [fieldAA, fieldBB]
When reading the buffer, flink will read fieldAA just fine, it will then
attempt to read ff ff ff ff as a string for fieldBB. Something has altered
the buffer such that it has field AB, but the Serialized Field list does
not have fieldAB.
-- Runtime Details and notes --
flink 1.14.3
stateful functions
300 GB savepoint size.
The total time elapsed between write and read seems to need to be a few
months for this corruption to happen.
Questions:
A) Any insight into the general mechanism related to Pojos and
serialization?
B) What can cause a keystate to be migrated? Clearly a read does, what
about just checkpointing over time, how about reading keystate with a key
that is "close" to the other key?
C) If a specific key in keystate is deserialized from rockdb, does flink
deserialize other (adjacent?) keys in the "block" of data?
D) Are there tools for manually editing Flink savepoints?