I know Reuven has put some thought into evolving schemas, but I'm not sure
it's documented anywhere as of now. The only documentation I've come across
as I bump around the schema code are some comments deep in RowCoder [1].
Essentially the current serialization format for a row includes a row count
as a prefix so we can detect "simple" schema changes like column additions
and deletions. When decoding a Row, if the current schema contains
*more* fields
than the encoded Row, the remaining fields are populated with nulls in the
resulting Row object. If the current schema contains *fewer* fields than
the encoded Row, the additional ones are just dropped.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296

On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <r...@skraba.com> wrote:

> I'm also really interested in the question of evolving schemas... It's
> something I've also put off figuring out :D
>
> With all its warts, the LazyAvroCoder technique (a coder backed by
> some sort of schema registry) _could_ work with "homogeneish" data
> (i.e. if the number of schemas in play for a single coder is much,
> much smaller than the number of elements), even if none of the the
> schemas are known at Pipeline construction.  The portability job
> server (which already stores and serves artifacts for running jobs)
> might be the right place to put a schema registry... but I'm not
> entirely convinced it's the right way to go either.
>
> At the same time, "simply" bumping a known schema to a new version is
> roughly equivalent to updating a pipeline in place.
>
> Sending the data as Java-serialized Rows will be equivalent to sending
> the entire schema with every record, so it _would_ work without
> involving a new, distributed state between one coders encode and
> anothers decode (at the cost of message size, of course).
>
> Ryan
>
>
> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pabl...@google.com> wrote:
> >
> > +dev
> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
> useful.
> >
> > The data is change data capture from databases, and I'm putting it into
> a Beam Row. The schema for the Row is generally homogeneous, but subject to
> change at some point in the future if the schema in the database changes.
> It's unusual and unlikely, but possible. I have no idea how Beam deals with
> evolving schemas. +Reuven Lax is there documentation / examples / anything
> around this? : )
> >
> > I think evolving schemas is an interesting question....
> >
> > For now, I am going to Java-serialize the objects, and delay figuring
> this out. But I reckon I'll have to come back to this...
> >
> > Best
> > -P.
> >
> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <r...@skraba.com> wrote:
> >>
> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
> >> pipeline construction time, but can be discovered from the instance of
> >> MyData?
> >>
> >> Once discovered, is the schema "homogeneous" for all instance of
> >> MyData?  (i.e. someRow will always have the same schema for all
> >> instances afterwards, and there won't be another someRow with a
> >> different schema).
> >>
> >> We've encountered a parallel "problem" with pure Avro data, where the
> >> instance is a GenericRecord containing it's own Avro schema but
> >> *without* knowing the schema until the pipeline is run.  The solution
> >> that we've been using is a bit hacky, but we're using an ad hoc
> >> per-job schema registry and a custom coder where each worker saves the
> >> schema in the `encode` before writing the record, and loads it lazily
> >> in the `decode` before reading.
> >>
> >> The original code is available[1] (be gentle, it was written with Beam
> >> 0.4.0-incubating... and has continued to work until now).
> >>
> >> In practice, the ad hoc schema registry is just a server socket in the
> >> Spark driver, in-memory for DirectRunner / local mode, and a a
> >> read/write to a known location in other runners.  There are definitely
> >> other solutions with side-inputs and providers, and the job server in
> >> portability looks like an exciting candidate for per-job schema
> >> registry story...
> >>
> >> I'm super eager to see if there are other ideas or a contribution we
> >> can make in this area that's "Beam Row" oriented!
> >>
> >> Ryan
> >>
> >> [1]
> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
> >>
> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pabl...@google.com>
> wrote:
> >> >
> >> > Hello all,
> >> > I am writing a utility to push data to PubSub. My data class looks
> something like so:
> >> > ==========
> >> > class MyData {
> >> >   String someId;
> >> >   Row someRow;
> >> >   Row someOtherRow;
> >> > }
> >> > ==============
> >> > The schema for the Rows is not known a-priori. It is contained by the
> Row. I am then pushing this data to pubsub:
> >> > ===========
> >> > MyData pushingData = ....
> >> > WhatCoder? coder = ....
> >> >
> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
> >> > coder.encode(this, os);
> >> >
> >> > pubsubClient.connect();
> >> >
> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
> >> > pubsubClient.close();
> >> > =================
> >> > What's the right coder to use in this case? I don't know if
> SchemaCoder will work, because it seems that it requires the Row's schema a
> priori. I have not been able to make AvroCoder work.
> >> >
> >> > Any tips?
> >> > Best
> >> > -P.
>

Reply via email to