Hi everyone, I'm using Beam on Flink with Avro generated records. If the record schema changes, the Flink state cannot be restored. I just want to send this email out for anyone who may need this info in the future and also ask others for possible solutions as this problem is so easily hit, that I'm having a hard time figuring out what other users of Beam running on the Flink runner are doing to circumvent it.
The in-depth discussion of the issue can be found here [1] (thanks Maximilian). There are also a few more emails about this here [2], and here [3]. The gist of the issue is that Beam serializes the coders used into the Flink state, and some of those coders hold references to the Bean/Pojos/Java classes they serialize/deserialize to. Flink serializes its state using Java serialization, that means that in the Flink state we will get a reference to the Bean/Pojo/Java class name and the related serialVersionUID. When the pojo (Avro generated) changes, so does its serialVersionUID, and Flink cannot deserialize the Beam state anymore because the serialVersionUID doesn't match, not on the Coder, but on the Pojo type that coder was holding when it got serialized. I decided to try each coder capable of handling Pojos, one by one, to see if any would work. That is, I tried the SerializableCoder, AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and SerializableCoder, I have used the SpecificRecord version (not the GenericRecord one) and the non-Row (ie: the one that returns a Pojo type, not Row type) version respectively. They all failed the below test (added it to be very explicit, but really, it's just simple schema evolution). Test: 1. Create a avro pojo (idl generated pojo): record FooRecord { union {null, string} dummy1 = null; } 2. Create a pipeline with a simple stateful DoFn, set desired coder for FooRecord (I tried the SerializableCoder, AvroCoder and the SchemaCoder/RowCoder), and populate state with a few FooRecord objects. 3. Start the pipeline 4. Stop the pipeline with a savepoint. 5. Augment FooRecord to add another field after dummy1. 6. Start the pipeline restoring from the saved savepoint. 7. Observed this exception when deserializing the savepoint --> "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord; local class incompatible: stream classdesc serialVersionUID = <some number>, local class serialVersionUID = <some other number>" There are a few workarounds. Workaround A: Right now my working solution is to implement what was suggested by Pavel (thanks Pavel) in [3]. Quote from him "having my business logic-related POJOs still Avro-generated, but I introduced another, generic one, which just stores schema & payload bytes, and does not need to change. then using a DelegateCoder that converts the POJO to/from that generic schema-bytes pojo that never changes". Basically something like this (pseudocode): record FlinkStateValue { string schema; bytes value; } var delegateCoder = DelegateCoder.of( AvroCoder.of(FlinkStateValue.class), (FooRecord in) -> FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)), (FlinkStateValue in) -> return AvroCoder.of(FooRecord.class).decode(in.getValue()) ) ; p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder) The downside is that now there's yet another deserialization step, which wastes CPU cycles. The upside is that things are decoupled, that is, I think the DelegateCoder could use a RowCoder.of(FooRecord) instead of the AvroCoder.of(FooRecord), or any other coder for that matter and you can change between them with only a code change. Workaround B: Difficulty hard! Use the Flink state api [4] and update the Beam serialized state to modify the FooRecord serialVersionUID stored in that state to the new one after the schema evolution, then save the state and start your pipeline with the evolved FooRecord. Workaround C: Wrap the Avro generated FooRecord to a real Pojo or AutoValue or anything that you have full control over serialVersionUID, and use that in your pipeline especially when putting things into the state. Problem arises when the Avro generated records have lots of properties and or nested records. It becomes tedious to essentially duplicate them to Pojo/AutoValue. Conclusion: I want to end by asking advice from the community. For those of you who use Beam with Avro records running on the Flink runner, how do you handle state when the Avro schema inevitably evolves? It just seems like it's such a simple use case and such an easy pittrap to fall into, that I'm unsure why there's only 3 people (4 including me) who asked for advice for this issue. Are the 4 of us doing something wrong? Thanks in advance for your advice, Cristian [1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html [2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html [3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns [4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html