Hello, Unfortunately the avro binary protocol is incompatible with direct schema evolution as you have attempted here. Instead, to decode an avro message, you must always (per the spec <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>) have access to the schema used to write it. This is due to a fundamental limitation of the protocol- it made the choice to not encode field numbers or separators in any way in the binary representation.
In order to facilitate (but not directly enable) schema evolution, avro binary encoding has an optional single message encoding, which effectively stores a header that includes a hash of the canonical schema representation before the message itself- but using this support requires explicit identification on the part of the user as to which schema was used to encode the message. If you want to evolve your protocol in the future, I would highly recommend using protobuf for binary encoding. If this is not possible, you should consider explicitly tagging your Pub/Sub message with an attribute identifying that the new schema version is in use, and performing decoding yourself using the correct schema if this attribute is present. This would also require you to use the PubSubIO.readMessages method instead of the readAvros method, and to update all readers before updating your writers to write the new schema. -Daniel On Fri, Oct 14, 2022 at 2:46 PM Claire McGinty <[email protected]> wrote: > Hi Beam devs, > > We recently encountered an issue with evolving the Avro schemas used in a > Pub/Sub message queue. We added a new, nullable-with-default field to the > end of the producer schema. We tested out the compatibility change by using > AvroCoder > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java>'s > encode method on the evolved schema and testing that it could still be > decoded with the old schema. However, our consumer Beam pipelines > threw errors at read time: unexpected extra bytes after decoding, coming > from CoderUtil's decodeFromByteArray > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CoderUtils.java#L103-L105> > method, where it checks for the presence of extra bytes after reading all > fields. > > I created a quick Gist > <https://gist.github.com/clairemcginty/0166cc19bae1768f604c684da8a3d99b#file-avroschemaevolutiontest-scala-L61-L84> > that > summarizes the issue. > > I'm wondering if there's any workaround to this: either adding a > special-case to CoderUtils to check if the coder is an AvroCoder type, or > adding an overridable method to the Coder abstract class called > allowExtraBytesAfterDecoding? I understand that both of those ideas have > some fairly scary implications, though :) I realize that the most "correct" > way to handle this is to have all consumers migrate to a new schema before > the producer does, but this can be a quite difficult task with a large > fleet of consumers, especially for streaming jobs where the Coder change > could result in a compatibility check failure. > > Looking forward to your input, > Claire > > >
