Hi Beam devs,

We recently encountered an issue with evolving the Avro schemas used in a
Pub/Sub message queue. We added a new, nullable-with-default field to the
end of the producer schema. We tested out the compatibility change by using
AvroCoder
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java>'s
encode method on the evolved schema and testing that it could still be
decoded with the old schema. However, our consumer Beam pipelines
threw errors at read time: unexpected extra bytes after decoding, coming
from CoderUtil's decodeFromByteArray
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L103-L105>
method, where it checks for the presence of extra bytes after reading all
fields.

I created a quick Gist
<https://gist.github.com/clairemcginty/0166cc19bae1768f604c684da8a3d99b#file-avroschemaevolutiontest-scala-L61-L84>
that
summarizes the issue.

I'm wondering if there's any workaround to this: either adding a
special-case to CoderUtils to check if the coder is an AvroCoder type, or
adding an overridable method to the Coder abstract class called
allowExtraBytesAfterDecoding? I understand that both of those ideas have
some fairly scary implications, though :) I realize that the most "correct"
way to handle this is to have all consumers migrate to a new schema before
the producer does, but this can be a quite difficult task with a large
fleet of consumers, especially for streaming jobs where the Coder change
could result in a compatibility check failure.

Looking forward to your input,
Claire

Reply via email to