+ dev Many thanks for sharing your observations and findings on this topic, Cristian! I copy it to dev@ as well to attract more attention to this problem.
— Alexey > On 18 Nov 2022, at 18:21, Cristian Constantinescu <zei...@gmail.com> wrote: > > Hi everyone, > > I'm using Beam on Flink with Avro generated records. If the record > schema changes, the Flink state cannot be restored. I just want to > send this email out for anyone who may need this info in the future > and also ask others for possible solutions as this problem is so > easily hit, that I'm having a hard time figuring out what other users > of Beam running on the Flink runner are doing to circumvent it. > > The in-depth discussion of the issue can be found here [1] (thanks > Maximilian). There are also a few more emails about this here [2], and > here [3]. > > The gist of the issue is that Beam serializes the coders used into the > Flink state, and some of those coders hold references to the > Bean/Pojos/Java classes they serialize/deserialize to. Flink > serializes its state using Java serialization, that means that in the > Flink state we will get a reference to the Bean/Pojo/Java class name > and the related serialVersionUID. When the pojo (Avro generated) > changes, so does its serialVersionUID, and Flink cannot deserialize > the Beam state anymore because the serialVersionUID doesn't match, not > on the Coder, but on the Pojo type that coder was holding when it got > serialized. > > I decided to try each coder capable of handling Pojos, one by one, to > see if any would work. That is, I tried the SerializableCoder, > AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and > SerializableCoder, I have used the SpecificRecord version (not the > GenericRecord one) and the non-Row (ie: the one that returns a Pojo > type, not Row type) version respectively. They all failed the below > test (added it to be very explicit, but really, it's just simple > schema evolution). > > Test: > 1. Create a avro pojo (idl generated pojo): > record FooRecord { > union {null, string} dummy1 = null; > } > 2. Create a pipeline with a simple stateful DoFn, set desired coder > for FooRecord (I tried the SerializableCoder, AvroCoder and the > SchemaCoder/RowCoder), and populate state with a few FooRecord > objects. > 3. Start the pipeline > 4. Stop the pipeline with a savepoint. > 5. Augment FooRecord to add another field after dummy1. > 6. Start the pipeline restoring from the saved savepoint. > 7. Observed this exception when deserializing the savepoint --> > "Caused by: java.io.InvalidClassException: com.mymodels.FooRecord; > local class incompatible: stream classdesc serialVersionUID = <some > number>, local class serialVersionUID = <some other number>" > > There are a few workarounds. > > Workaround A: > Right now my working solution is to implement what was suggested by > Pavel (thanks Pavel) in [3]. Quote from him "having my business > logic-related POJOs still Avro-generated, but I introduced another, > generic one, which just stores schema & payload bytes, and does not > need to change. then using a DelegateCoder that converts the POJO > to/from that generic schema-bytes pojo that never changes". > > Basically something like this (pseudocode): > record FlinkStateValue { > string schema; > bytes value; > } > > var delegateCoder = DelegateCoder.of( > AvroCoder.of(FlinkStateValue.class), > (FooRecord in) -> > FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)), > (FlinkStateValue in) -> return > AvroCoder.of(FooRecord.class).decode(in.getValue()) > ) ; > > p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder) > > The downside is that now there's yet another deserialization step, > which wastes CPU cycles. The upside is that things are decoupled, that > is, I think the DelegateCoder could use a RowCoder.of(FooRecord) > instead of the AvroCoder.of(FooRecord), or any other coder for that > matter and you can change between them with only a code change. > > Workaround B: > Difficulty hard! Use the Flink state api [4] and update the Beam > serialized state to modify the FooRecord serialVersionUID stored in > that state to the new one after the schema evolution, then save the > state and start your pipeline with the evolved FooRecord. > > Workaround C: > Wrap the Avro generated FooRecord to a real Pojo or AutoValue or > anything that you have full control over serialVersionUID, and use > that in your pipeline especially when putting things into the state. > > Problem arises when the Avro generated records have lots of properties > and or nested records. It becomes tedious to essentially duplicate > them to Pojo/AutoValue. > > Conclusion: > I want to end by asking advice from the community. For those of you > who use Beam with Avro records running on the Flink runner, how do you > handle state when the Avro schema inevitably evolves? > > It just seems like it's such a simple use case and such an easy > pittrap to fall into, that I'm unsure why there's only 3 people (4 > including me) who asked for advice for this issue. Are the 4 of us > doing something wrong? > > Thanks in advance for your advice, > Cristian > > [1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html > [2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html > [3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns > [4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html