I know Reuven has put some thought into evolving schemas, but I'm not sure it's documented anywhere as of now. The only documentation I've come across as I bump around the schema code are some comments deep in RowCoder [1]. Essentially the current serialization format for a row includes a row count as a prefix so we can detect "simple" schema changes like column additions and deletions. When decoding a Row, if the current schema contains *more* fields than the encoded Row, the remaining fields are populated with nulls in the resulting Row object. If the current schema contains *fewer* fields than the encoded Row, the additional ones are just dropped.
[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296 On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <r...@skraba.com> wrote: > I'm also really interested in the question of evolving schemas... It's > something I've also put off figuring out :D > > With all its warts, the LazyAvroCoder technique (a coder backed by > some sort of schema registry) _could_ work with "homogeneish" data > (i.e. if the number of schemas in play for a single coder is much, > much smaller than the number of elements), even if none of the the > schemas are known at Pipeline construction. The portability job > server (which already stores and serves artifacts for running jobs) > might be the right place to put a schema registry... but I'm not > entirely convinced it's the right way to go either. > > At the same time, "simply" bumping a known schema to a new version is > roughly equivalent to updating a pipeline in place. > > Sending the data as Java-serialized Rows will be equivalent to sending > the entire schema with every record, so it _would_ work without > involving a new, distributed state between one coders encode and > anothers decode (at the cost of message size, of course). > > Ryan > > > On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pabl...@google.com> wrote: > > > > +dev > > Thanks Ryan! This is quite helpful. Still not what I need : ) - but > useful. > > > > The data is change data capture from databases, and I'm putting it into > a Beam Row. The schema for the Row is generally homogeneous, but subject to > change at some point in the future if the schema in the database changes. > It's unusual and unlikely, but possible. I have no idea how Beam deals with > evolving schemas. +Reuven Lax is there documentation / examples / anything > around this? : ) > > > > I think evolving schemas is an interesting question.... > > > > For now, I am going to Java-serialize the objects, and delay figuring > this out. But I reckon I'll have to come back to this... > > > > Best > > -P. > > > > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <r...@skraba.com> wrote: > >> > >> Hello Pablo! Just to clarify -- the Row schemas aren't known at > >> pipeline construction time, but can be discovered from the instance of > >> MyData? > >> > >> Once discovered, is the schema "homogeneous" for all instance of > >> MyData? (i.e. someRow will always have the same schema for all > >> instances afterwards, and there won't be another someRow with a > >> different schema). > >> > >> We've encountered a parallel "problem" with pure Avro data, where the > >> instance is a GenericRecord containing it's own Avro schema but > >> *without* knowing the schema until the pipeline is run. The solution > >> that we've been using is a bit hacky, but we're using an ad hoc > >> per-job schema registry and a custom coder where each worker saves the > >> schema in the `encode` before writing the record, and loads it lazily > >> in the `decode` before reading. > >> > >> The original code is available[1] (be gentle, it was written with Beam > >> 0.4.0-incubating... and has continued to work until now). > >> > >> In practice, the ad hoc schema registry is just a server socket in the > >> Spark driver, in-memory for DirectRunner / local mode, and a a > >> read/write to a known location in other runners. There are definitely > >> other solutions with side-inputs and providers, and the job server in > >> portability looks like an exciting candidate for per-job schema > >> registry story... > >> > >> I'm super eager to see if there are other ideas or a contribution we > >> can make in this area that's "Beam Row" oriented! > >> > >> Ryan > >> > >> [1] > https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java > >> > >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pabl...@google.com> > wrote: > >> > > >> > Hello all, > >> > I am writing a utility to push data to PubSub. My data class looks > something like so: > >> > ========== > >> > class MyData { > >> > String someId; > >> > Row someRow; > >> > Row someOtherRow; > >> > } > >> > ============== > >> > The schema for the Rows is not known a-priori. It is contained by the > Row. I am then pushing this data to pubsub: > >> > =========== > >> > MyData pushingData = .... > >> > WhatCoder? coder = .... > >> > > >> > ByteArrayOutputStream os = new ByteArrayOutputStream(); > >> > coder.encode(this, os); > >> > > >> > pubsubClient.connect(); > >> > > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build()); > >> > pubsubClient.close(); > >> > ================= > >> > What's the right coder to use in this case? I don't know if > SchemaCoder will work, because it seems that it requires the Row's schema a > priori. I have not been able to make AvroCoder work. > >> > > >> > Any tips? > >> > Best > >> > -P. >