Hi everyone,

I'm using Beam on Flink with Avro generated records. If the record
schema changes, the Flink state cannot be restored. I just want to
send this email out for anyone who may need this info in the future
and also ask others for possible solutions as this problem is so
easily hit, that I'm having a hard time figuring out what other users
of Beam running on the Flink runner are doing to circumvent it.

The in-depth discussion of the issue can be found here [1] (thanks
Maximilian). There are also a few more emails about this here [2], and
here [3].

The gist of the issue is that Beam serializes the coders used into the
Flink state, and some of those coders hold references to the
Bean/Pojos/Java classes they serialize/deserialize to. Flink
serializes its state using Java serialization, that means that in the
Flink state we will get a reference to the Bean/Pojo/Java class name
and the related serialVersionUID. When the pojo (Avro generated)
changes, so does its serialVersionUID, and Flink cannot deserialize
the Beam state anymore because the serialVersionUID doesn't match, not
on the Coder, but on the Pojo type that coder was holding when it got
serialized.

I decided to try each coder capable of handling Pojos, one by one, to
see if any would work. That is, I tried the SerializableCoder,
AvroCoder and the SchemaCoder/RowCoder. In the case of AvroCoder and
SerializableCoder, I have used the SpecificRecord version (not the
GenericRecord one) and the non-Row (ie: the one that returns a Pojo
type, not Row type) version respectively. They all failed the below
test (added it to be very explicit, but really, it's just simple
schema evolution).

Test:
1. Create a avro pojo (idl generated pojo):
record FooRecord {
union {null, string} dummy1 = null;
}
2. Create a pipeline with a simple stateful DoFn, set desired coder
for FooRecord (I tried the SerializableCoder, AvroCoder and the
SchemaCoder/RowCoder), and populate state with a few FooRecord
objects.
3. Start the pipeline
4. Stop the pipeline with a savepoint.
5. Augment FooRecord to add another field after dummy1.
6. Start the pipeline restoring from the saved savepoint.
7. Observed this exception when deserializing the savepoint -->
"Caused by: java.io.InvalidClassException: com.mymodels.FooRecord;
local class incompatible: stream classdesc serialVersionUID = <some
number>, local class serialVersionUID = <some other number>"

There are a few workarounds.

Workaround A:
Right now my working solution is to implement what was suggested by
Pavel (thanks Pavel) in [3]. Quote from him "having my business
logic-related POJOs still Avro-generated, but I introduced another,
generic one, which just stores schema & payload bytes, and does not
need to change. then using a DelegateCoder that converts the POJO
to/from that generic schema-bytes pojo that never changes".

Basically something like this (pseudocode):
record FlinkStateValue {
string schema;
bytes value;
}

var delegateCoder = DelegateCoder.of(
AvroCoder.of(FlinkStateValue.class),
(FooRecord in) ->
FlinkStateValue.setSchema(FooRecord.getSchema()).setValue(AvroCoder.of(FooRecord.class).encode(in)),
(FlinkStateValue in) -> return
AvroCoder.of(FooRecord.class).decode(in.getValue())
) ;

p.getCoderRegistry().registerCoderForClass(FooRecord.class, delegateCoder)

The downside is that now there's yet another deserialization step,
which wastes CPU cycles. The upside is that things are decoupled, that
is, I think the DelegateCoder could use a RowCoder.of(FooRecord)
instead of the AvroCoder.of(FooRecord), or any other coder for that
matter and you can change between them with only a code change.

Workaround B:
Difficulty hard! Use the Flink state api [4] and update the Beam
serialized state to modify the FooRecord serialVersionUID stored in
that state to the new one after the schema evolution, then save the
state and start your pipeline with the evolved FooRecord.

Workaround C:
Wrap the Avro generated FooRecord to a real Pojo or AutoValue or
anything that you have full control over serialVersionUID, and use
that in your pipeline especially when putting things into the state.

Problem arises when the Avro generated records have lots of properties
and or nested records. It becomes tedious to essentially duplicate
them to Pojo/AutoValue.

Conclusion:
I want to end by asking advice from the community. For those of you
who use Beam with Avro records running on the Flink runner, how do you
handle state when the Avro schema inevitably evolves?

It just seems like it's such a simple use case and such an easy
pittrap to fall into, that I'm unsure why there's only 3 people (4
including me) who asked for advice for this issue. Are the 4 of us
doing something wrong?

Thanks in advance for your advice,
Cristian

[1] https://www.mail-archive.com/user@beam.apache.org/msg05648.html
[2] https://www.mail-archive.com/user@beam.apache.org/msg07169.html
[3] https://lists.apache.org/thread/rlnljx684pvg3fvfv3nxvbdbnxg19nns
[4] https://flink.apache.org/feature/2019/09/13/state-processor-api.html

Reply via email to