I'm also really interested in the question of evolving schemas... It's
something I've also put off figuring out :D

With all its warts, the LazyAvroCoder technique (a coder backed by
some sort of schema registry) _could_ work with "homogeneish" data
(i.e. if the number of schemas in play for a single coder is much,
much smaller than the number of elements), even if none of the the
schemas are known at Pipeline construction.  The portability job
server (which already stores and serves artifacts for running jobs)
might be the right place to put a schema registry... but I'm not
entirely convinced it's the right way to go either.

At the same time, "simply" bumping a known schema to a new version is
roughly equivalent to updating a pipeline in place.

Sending the data as Java-serialized Rows will be equivalent to sending
the entire schema with every record, so it _would_ work without
involving a new, distributed state between one coders encode and
anothers decode (at the cost of message size, of course).

Ryan


On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada <pabl...@google.com> wrote:
>
> +dev
> Thanks Ryan! This is quite helpful. Still not what I need : ) - but useful.
>
> The data is change data capture from databases, and I'm putting it into a 
> Beam Row. The schema for the Row is generally homogeneous, but subject to 
> change at some point in the future if the schema in the database changes. 
> It's unusual and unlikely, but possible. I have no idea how Beam deals with 
> evolving schemas. +Reuven Lax is there documentation / examples / anything 
> around this? : )
>
> I think evolving schemas is an interesting question....
>
> For now, I am going to Java-serialize the objects, and delay figuring this 
> out. But I reckon I'll have to come back to this...
>
> Best
> -P.
>
> On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba <r...@skraba.com> wrote:
>>
>> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> pipeline construction time, but can be discovered from the instance of
>> MyData?
>>
>> Once discovered, is the schema "homogeneous" for all instance of
>> MyData?  (i.e. someRow will always have the same schema for all
>> instances afterwards, and there won't be another someRow with a
>> different schema).
>>
>> We've encountered a parallel "problem" with pure Avro data, where the
>> instance is a GenericRecord containing it's own Avro schema but
>> *without* knowing the schema until the pipeline is run.  The solution
>> that we've been using is a bit hacky, but we're using an ad hoc
>> per-job schema registry and a custom coder where each worker saves the
>> schema in the `encode` before writing the record, and loads it lazily
>> in the `decode` before reading.
>>
>> The original code is available[1] (be gentle, it was written with Beam
>> 0.4.0-incubating... and has continued to work until now).
>>
>> In practice, the ad hoc schema registry is just a server socket in the
>> Spark driver, in-memory for DirectRunner / local mode, and a a
>> read/write to a known location in other runners.  There are definitely
>> other solutions with side-inputs and providers, and the job server in
>> portability looks like an exciting candidate for per-job schema
>> registry story...
>>
>> I'm super eager to see if there are other ideas or a contribution we
>> can make in this area that's "Beam Row" oriented!
>>
>> Ryan
>>
>> [1] 
>> https://github.com/Talend/components/blob/master/core/components-adapter-beam/src/main/java/org/talend/components/adapter/beam/coders/LazyAvroCoder.java
>>
>> On Tue, Jul 23, 2019 at 12:49 AM Pablo Estrada <pabl...@google.com> wrote:
>> >
>> > Hello all,
>> > I am writing a utility to push data to PubSub. My data class looks 
>> > something like so:
>> > ==========
>> > class MyData {
>> >   String someId;
>> >   Row someRow;
>> >   Row someOtherRow;
>> > }
>> > ==============
>> > The schema for the Rows is not known a-priori. It is contained by the Row. 
>> > I am then pushing this data to pubsub:
>> > ===========
>> > MyData pushingData = ....
>> > WhatCoder? coder = ....
>> >
>> > ByteArrayOutputStream os = new ByteArrayOutputStream();
>> > coder.encode(this, os);
>> >
>> > pubsubClient.connect();
>> > pubsubClient.push(PubSubMessage.newBuilder().setData(os.toByteArray()).build());
>> > pubsubClient.close();
>> > =================
>> > What's the right coder to use in this case? I don't know if SchemaCoder 
>> > will work, because it seems that it requires the Row's schema a priori. I 
>> > have not been able to make AvroCoder work.
>> >
>> > Any tips?
>> > Best
>> > -P.

Reply via email to