The most challenging part, as I understand it, surrounds automatically
inferred schemas from POJOs, where Java's nondeterministic iteration order,
combined with a row's inherent ordering, means that even an identical
pipeline will need some metadata to plumb the right fields to the right
column indices.

Most relational migration management I've done incorporates explicit
migration logic along with changes to the schema. This is quite a lot more
robust, but more implementation work, than having a default policy
proto/avro/thrift style. I think there's a lot to explore here.

Kenn

On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette <bhule...@google.com> wrote:

> I know Reuven has put some thought into evolving schemas, but I'm not sure
> it's documented anywhere as of now. The only documentation I've come across
> as I bump around the schema code are some comments deep in RowCoder [1].
> Essentially the current serialization format for a row includes a row
> count as a prefix so we can detect "simple" schema changes like column
> additions and deletions. When decoding a Row, if the current schema
> contains *more* fields than the encoded Row, the remaining fields are
> populated with nulls in the resulting Row object. If the current schema
> contains *fewer* fields than the encoded Row, the additional ones are
> just dropped.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>
> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba <r...@skraba.com> wrote:
>
>> I'm also really interested in the question of evolving schemas... It's
>> something I've also put off figuring out :D
>>
>> With all its warts, the LazyAvroCoder technique (a coder backed by
>> some sort of schema registry) _could_ work with "homogeneish" data
>> (i.e. if the number of schemas in play for a single coder is much,
>> much smaller than the number of elements), even if none of the the
>> schemas are known at Pipeline construction.  The portability job
>> server (which already stores and serves artifacts for running jobs)
>> might be the right place to put a schema registry... but I'm not
>> entirely convinced it's the right way to go either.
>>
>> At the same time, "simply" bumping a known schema to a new version is
>> roughly equivalent to updating a pipeline in place.
>>
>> Sending the data as Java-serialized Rows will be equivalent to sending
>> the entire schema with every record, so it _would_ work without
>> involving a new, distributed state between one coders encode and
>> anothers decode (at the cost of message size, of course).
>>
>> Ryan
>>
>>
>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pabl...@google.com> wrote:
>> >
>> > +dev
>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>> useful.
>> >
>> > The data is change data capture from databases, and I'm putting it into
>> a Beam Row. The schema for the Row is generally homogeneous, but subject to
>> change at some point in the future if the schema in the database changes.
>> It's unusual and unlikely, but possible. I have no idea how Beam deals with
>> evolving schemas. +Reuven Lax is there documentation / examples / anything
>> around this? : )
>> >
>> > I think evolving schemas is an interesting question....
>> >
>> > For now, I am going to Java-serialize the objects, and delay figuring
>> this out. But I reckon I'll have to come back to this...
>> >
>> > Best
>> > -P.
>> >
>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <r...@skraba.com> wrote:
>> >>
>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> >> pipeline construction time, but can be discovered from the instance of
>> >> MyData?
>> >>
>> >> Once discovered, is the schema "homogeneous" for all instance of
>> >> MyData?  (i.e. someRow will always have the same schema for all
>> >> instances afterwards, and there won't be another someRow with a
>> >> different schema).
>> >>
>> >> We've encountered a parallel "problem" with pure Avro data, where the
>> >> instance is a GenericRecord containing it's own Avro schema but
>> >> *without* knowing the schema until the pipeline is run.  The solution
>> >> that we've been using is a bit hacky, but we're using an ad hoc
>> >> per-job schema registry and a custom coder where each worker saves the
>> >> schema in the `encode` before writing the record, and loads it lazily
>> >> in the `decode` before reading.
>> >>
>> >> The original code is available[1] (be gentle, it was written with Beam
>> >> 0.4.0-incubating... and has continued to work until now).
>> >>
>> >> In practice, the ad hoc schema registry is just a server socket in the
>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>> >> read/write to a known location in other runners.  There are definitely
>> >> other solutions with side-inputs and providers, and the job server in
>> >> portability looks like an exciting candidate for per-job schema
>> >> registry story...
>> >>
>> >> I'm super eager to see if there are other ideas or a contribution we
>> >> can make in this area that's "Beam Row" oriented!
>> >>
>> >> Ryan
>> >>
>> >> [1]
>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>> >>
>> >> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pabl...@google.com>
>> wrote:
>> >> >
>> >> > Hello all,
>> >> > I am writing a utility to push data to PubSub. My data class looks
>> something like so:
>> >> > ==========
>> >> > class MyData {
>> >> >   String someId;
>> >> >   Row someRow;
>> >> >   Row someOtherRow;
>> >> > }
>> >> > ==============
>> >> > The schema for the Rows is not known a-priori. It is contained by
>> the Row. I am then pushing this data to pubsub:
>> >> > ===========
>> >> > MyData pushingData = ....
>> >> > WhatCoder? coder = ....
>> >> >
>> >> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> >> > coder.encode(this, os);
>> >> >
>> >> > pubsubClient.connect();
>> >> >
>> pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> >> > pubsubClient.close();
>> >> > =================
>> >> > What's the right coder to use in this case? I don't know if
>> SchemaCoder will work, because it seems that it requires the Row's schema a
>> priori. I have not been able to make AvroCoder work.
>> >> >
>> >> > Any tips?
>> >> > Best
>> >> > -P.
>>
>

Reply via email to