Having slept on it here are my thoughts, but granted, AFAICT there is no
spec for schema's so my understanding is based on what I've learned in the
last 18-ish hours. If there is a spec, I'd love to see it.

*1.* Default behavior to support Schema's in some way doesn't remove the
need for certain specific uses of an atomic coder for a type. eg.
Specifying that Beam shouldn't look further into this type.

TBH the interaction between schema's and coders is the least interesting
part about schemas and matters in precious few circumstances. In
particular, when Grouping By Key, it seems like the schema coder should be
used by default but otherwise, not. Further, there's always the option to
"try the schema" encoding and should that fail, try any existing atomic
coder by default, though this risks data corruption in some situations.

*1.a *In a later beam version, it could be true that there's no need for
such uses. There's always the option to work around anything by writing at
DoFn that accepts a []byte, and then produces a given type. However
decoding []byte and encoding back again seems like a common enough
operation for some domains that having direct beam support in some capacity
is desirable for performance reasons.

*2.* It would be easy enough to have a pipeline fail at construction time
should a type not be able to derive a schema for itself, and it's put into
a schema required scenario.

*3.* The Go SDK does recursive type analysis to be able encode types
<https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/serialize.go#L346>
for coders anyway, as Go has no native concept of "serializable types" or
"serializable functions" It wouldn't be too much of a stretch to convert
this representation to a Portable Schema representation.

When materializing types, Go has extensively defined Type Conversion rules
<https://golang.org/ref/spec#Conversions> which are accessible via the
reflect package. This means that we can always synthetically create an
instance of a real type from something like a schema, assuming they match
field for field. Eg. If a user declares a PCollection with a given Schema,
then in principle it would be possible to use that PCollection as an input
with a field for field compatible real struct type, and have this verified
at construction time. The "extra sauce" would be to have this happen for a
subset of fields for convenient extraction, ala the annotation use in java.

In particular, this means that whenever the Go SDK is in a scenario that it
doesn't have a schema*, it could probably create one ad-hoc *for that
context, and use the atomic coder the rest of the time if available.
Whether we want it do so is another matter, and probably situation specific.

*4. *It seems Long Term (in that it will be eventually be done, not that it
will necessarily take a long time to get there), that Schemas are likely
the interchange format for Cross Language pipeline support. That is, when
an SDK is invoking a transform in a different language (say, Beam Go
calling on Beam SQL), the values could be specified, and returned in the
schema format, to ensure compatibility. The trick here is that the expected
return schema still needs to be explicitly specified from the user in some
circumstances. (eg. Going from a SQL statement -> Schema doesn't seem like
a natural fit, and won't necessarily be available at pipeline construction
time in the remote language.)

*5.* An interesting aspect of schemas is that they fundamentally enable
SDKs to start with a light DSL layer with "known" types and
transforms/combines/joins, which then never need to be invoked on the SDK
layer. Runners could each implement schemas directly and avoid unnecessary
FnAPI hops for improved performance, largely because they know the type's
structure. No need for any of it to be implemented SDK side to start.

 Overall this is a noble goal in that it enables more languages more
easily, but it's concerning from my view, in that the other goal is to
enable data processing in the SDK language, and this moves it farther away
from the more general, if verbose approaches to do the same thing.

I'm on the side of Scalable Data Processing in Go, which ideally entails
writing Go, rather than an abstract DSL.


I don't speak for all Go users, and welcome hearing from others.

On Thu, 3 Jan 2019 at 17:52 Robert Burke <rob...@frantil.com> wrote:

> At this point I feel like the schema discussion should be a separate
> thread from having a Coder Registry in Go, which was the original topic, so
> I'm forking it.
>
> It does sounds like adding Schemas to the Go SDK would be a much larger
> extension than the registry.
>
> I'm not convinced that not having a convenient registry would serve Go SDK
> users (such as they exist).
>
> The concern I have isn't so much for Ints or Doubles, but for user types
> such as Protocol Buffers, but not just those. There will be some users who
> prize efficiency first, and readability second. The Go SDK presently uses
> JSON encoding by default which has many of the properties of schemas, but
> is severely limiting for power users.
>
>
> It sounds like the following are true:
> 1. Full use of the Schemas in the Go SDK will require FnAPI support.
> * Until the FnAPI supports it, and the semantics are implemented in the
> ULR, the Go SDK probably shouldn't begin to implement against it.
> * This is identical to Go's lack of SplitableDoFn keeping Beam Go
> pipelines from scaling or from having Cross Language IO, which is also a
> precursor to BeamGo using Beam SQL.
> 2. The main collision between Schemas and Coders are in the event that a
> given type has both defined for it: Which is being used and when?
> * This seems to me more to do with being able to enable use of the
> syntactic sugar or not, but we know that at construction time, by the very
> use of the sugar.
> * If a file wants to materialize a file encoded with the Schema, one would
> need to encode that in the DoFn doing the writing somehow (eg. ForceSchema
> or ForceCoder, whichever we want to make the default). This has pipeline
> compatibility implications.
>
> It's not presently possible for Go to annotate function parameters, but
> something could be worked out, similarly to how SideInputs are configured
> in the Go SDK. I'd be concerned about the efficiency of those operations
> though, even with Generics or code generation.
>
>
> On Thu, 3 Jan 2019 at 16:33 Reuven Lax <re...@google.com> wrote:
>
>> On Fri, Jan 4, 2019 at 1:19 AM Robert Burke <rob...@frantil.com> wrote:
>>
>>> Very interesting Reuven!
>>>
>>> That would be a huge readability improvement, but it would also be a
>>> significant investment over my time budget to implement them on the Go side
>>> correctly. I would certainly want to read your documentation before going
>>> ahead.  Will the Portability FnAPI have dedicated Schema support? That
>>> would certainly change things.
>>>
>>
>> Yes, there's absolutely a plan to add schema definitions to the FnAPI.
>> This is what will allow you to use SQL from BeamGo
>>
>>>
>>> It's not clear to me how one might achieve the inversion from
>>> SchemaCoder being a special casing of CustomCoder to the other way around,
>>> since a field has a type, and that type needs to be encoded. Short of
>>> always encoding the primitive values in the way Beam prefers, it doesn't
>>> seem to allow for customizing the encoding on output, or really say
>>> anything outside of the (admittedly excellent) syntactic sugar demonstrated
>>> with the Java API.
>>>
>>
>> I'm not quite sure I understand. But schemas define a fixed set of
>> primitive types, and also define the encodings for those primitive types.
>> If a user wants custom encoding for a primitive type, they can create a
>> byte-array field and wrap that field with a Coder (this is why I said that
>> todays Coders are simply special cases); this should be very rare though,
>> as users rarely should care how Beam encodes a long or a double.
>>
>>>
>>> Offhand, Schemas seem to be an alternative to pipeline construction,
>>> rather than coders for value serialization, allowing manual field
>>> extraction code to be omitted. They do not appear to be a fundamental
>>> approach to achieve it. For example, the grouping operation still needs to
>>> encode the whole of the object as a value.
>>>
>>
>> Schemas are properties of the data - essentially a Schema is the data
>> type of a PCollection. In Java Schemas are also understood by ParDo, so you
>> can write a ParDo like this:
>>
>> @ProcessElement
>> public void process(@Field("user") String userId,  @Field("country")
>> String countryCode) {
>> }
>>
>> These extra functionalities are part of the graph, but they are enabled
>> by schemas.
>>
>>>
>>> As mentioned, I'm hoping to have a solution for existing coders by
>>> January's end, so waiting for your documentation doesn't work on that
>>> timeline.
>>>
>>
>> I don't think we need to wait for all the documentation to be written.
>>
>>
>>>
>>> That said, they aren't incompatible ideas as demonstrated by the Java
>>> implementation. The Go SDK remains in an experimental state. We can change
>>> things should the need arise in the next few months. Further, whenever 
>>> Generics
>>> in Go
>>> <https://go.googlesource.com/proposal/+/master/design/go2draft-generics-overview.md>
>>> crop up, the existing user surface and execution stack will need to be
>>> re-written to take advantage of them anyway. That provides an opportunity
>>> to invert Coder vs Schema dependence while getting a nice performance
>>> boost, and cleaner code (and deleting much of my code generator).
>>>
>>> ----
>>>
>>> Were I to implement schemas to get the same syntatic benefits as the
>>> Java API, I'd be leveraging the field annotations Go has. This satisfies
>>> the protocol buffer issue as well, since generated go protos have name &
>>> json annotations. Schemas could be extracted that way. These are also
>>> available to anything using static analysis for more direct generation of
>>> accessors. The reflective approach would also work, which is excellent for
>>> development purposes.
>>>
>>> The rote code that the schemas were replacing would be able to be
>>> cobbled together into efficient DoFn and CombineFns for serialization. At
>>> present, it seems like it could be implemented as a side package that uses
>>> beam, rather than changing portions of the core beam Go packages, The real
>>> trick would be to do so without "apply" since that's not how the Go SDK is
>>> shaped.
>>>
>>>
>>>
>>>
>>> On Thu, 3 Jan 2019 at 15:34 Gleb Kanterov <g...@spotify.com> wrote:
>>>
>>>> Reuven, it sounds great. I see there is a similar thing to Row coders
>>>> happening in Apache Arrow <https://arrow.apache.org>, and there is a
>>>> similarity between Apache Arrow Flight
>>>> <https://www.slideshare.net/wesm/apache-arrow-at-dataengconf-barcelona-2018/23>
>>>> and data exchange service in portability. How do you see these two things
>>>> relate to each other in the long term?
>>>>
>>>> On Fri, Jan 4, 2019 at 12:13 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> The biggest advantage is actually readability and usability. A
>>>>> secondary advantage is that it means that Go will be able to interact
>>>>> seamlessly with BeamSQL, which would be a big win for Go.
>>>>>
>>>>> A schema is basically a way of saying that a record has a specific set
>>>>> of (possibly nested, possibly repeated) fields. So for instance let's say
>>>>> that the user's type is a struct with fields named user, country,
>>>>> purchaseCost. This allows us to provide transforms that operate on field
>>>>> names. Some example (using the Java API):
>>>>>
>>>>> PCollection users = events.apply(Select.fields("user"));  // Select
>>>>> out only the user field.
>>>>>
>>>>> PCollection joinedEvents =
>>>>> queries.apply(Join.innerJoin(clicks).byFields("user"));  // Join two
>>>>> PCollections by user.
>>>>>
>>>>> // For each country, calculate the total purchase cost as well as the
>>>>> top 10 purchases.
>>>>> // A new schema is created containing fields total_cost and
>>>>> top_purchases, and rows are created with the aggregation results.
>>>>> PCollection purchaseStatistics = events.apply(
>>>>>     Group.byFieldNames("country")
>>>>>                .aggregateField("purchaseCost", Sum.ofLongs(),
>>>>> "total_cost"))
>>>>>                 .aggregateField("purchaseCost", Top.largestLongs(10),
>>>>> "top_purchases"))
>>>>>
>>>>>
>>>>> This is far more readable than what we have today, and what unlocks
>>>>> this is that Beam actually knows the structure of the record instead of
>>>>> assuming records are uncrackable blobs.
>>>>>
>>>>> Note that a coder is basically a special case of a schema that has a
>>>>> single field.
>>>>>
>>>>> In BeamJava we have a SchemaRegistry which knows how to turn user
>>>>> types into schemas. We use reflection to analyze many user types (e.g.
>>>>> simple POJO structs, JavaBean classes, Avro records, protocol buffers,
>>>>> etc.) to determine the schema, however this is done only when the graph is
>>>>> initially generated. We do use code generation (in Java we do bytecode
>>>>> generation) to make this somewhat more efficient. I'm willing to bet that
>>>>> the code generator you've written for structs could be very easily 
>>>>> modified
>>>>> for schemas instead, so it would not be wasted work if we went with 
>>>>> schemas.
>>>>>
>>>>> One of the things I'm working on now is documenting Beam schemas. They
>>>>> are already very powerful and useful, but since there is still nothing in
>>>>> our documentation about them, they are not yet widely used. I expect to
>>>>> finish draft documentation by the end of January.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Jan 3, 2019 at 11:32 PM Robert Burke <r...@google.com> wrote:
>>>>>
>>>>>> That's an interesting idea. I must confess I don't rightly know the
>>>>>> difference between a schema and coder, but here's what I've got with a 
>>>>>> bit
>>>>>> of searching through memory and the mailing list. Please let me know if 
>>>>>> I'm
>>>>>> off track.
>>>>>>
>>>>>> As near as I can tell, a schema, as far as Beam takes it
>>>>>> <https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java>
>>>>>>  is
>>>>>> a mechanism to define what data is extracted from a given row of data. So
>>>>>> in principle, there's an opportunity to be more efficient with data with
>>>>>> many columns that aren't being used, and only extract the data that's
>>>>>> meaningful to the pipeline.
>>>>>> The trick then is how to apply the schema to a given serialization
>>>>>> format, which is something I'm missing in my mental model (and then how 
>>>>>> to
>>>>>> do it efficiently in Go).
>>>>>>
>>>>>> I do know that the Go client package for BigQuery
>>>>>> <https://godoc.org/cloud.google.com/go/bigquery#hdr-Schemas> does
>>>>>> something like that, using field tags. Similarly, the "encoding/json"
>>>>>> <https://golang.org/doc/articles/json_and_go.html> package in the Go
>>>>>> Standard Library permits annotating fields and it will read out and
>>>>>> deserialize the JSON fields and that's it.
>>>>>>
>>>>>> A concern I have is that Go (at present) would require pre-compile
>>>>>> time code generation for schemas to be efficient, and they would still
>>>>>> mostly boil down to turning []bytes into real structs. Go reflection
>>>>>> doesn't keep up.
>>>>>> Go has no mechanism I'm aware of to Just In Time compile more
>>>>>> efficient processing of values.
>>>>>> It's also not 100% clear how Schema's would play with protocol
>>>>>> buffers or similar.
>>>>>> BigQuery has a mechanism of generating a JSON schema from a proto file
>>>>>> <https://github.com/GoogleCloudPlatform/protoc-gen-bq-schema>, but
>>>>>> that's only the specification half, not the using half.
>>>>>>
>>>>>> As it stands, the code generator I've been building these last months
>>>>>> could (in principle) statically analyze a user's struct, and then 
>>>>>> generate
>>>>>> an efficient dedicated coder for it. It just has no where to put them 
>>>>>> such
>>>>>> that the Go SDK would use it.
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 3, 2019 at 1:39 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> I'll make a different suggestion. There's been some chatter that
>>>>>>> schemas are a better tool than coders, and that in Beam 3.0 we should 
>>>>>>> make
>>>>>>> schemas the basic semantics instead of coders. Schemas provide 
>>>>>>> everything a
>>>>>>> coder provides, but also allows for far more readable code. We can't 
>>>>>>> make
>>>>>>> such a change in Beam Java 2.X for compatibility reasons, but maybe in 
>>>>>>> Go
>>>>>>> we're better off starting with schemas instead of coders?
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Thu, Jan 3, 2019 at 8:45 PM Robert Burke <rob...@frantil.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> One area that the Go SDK currently lacks: is the ability for users
>>>>>>>> to specify their own coders for types.
>>>>>>>>
>>>>>>>> I've written a proposal document,
>>>>>>>> <https://docs.google.com/document/d/1kQwx4Ah6PzG8z2ZMuNsNEXkGsLXm6gADOZaIO7reUOg/edit#>
>>>>>>>>  and
>>>>>>>> while I'm confident about the core, there are certainly some edge cases
>>>>>>>> that require discussion before getting on with the implementation.
>>>>>>>>
>>>>>>>> At presently, the SDK only permits primitive value types (all
>>>>>>>> numeric types but complex, strings, and []bytes) which are coded with 
>>>>>>>> beam
>>>>>>>> coders, and structs whose exported fields are of those type, which is 
>>>>>>>> then
>>>>>>>> encoded as JSON. Protocol buffer support is hacked in to avoid the type
>>>>>>>> anaiyzer, and presents the current work around this issue.
>>>>>>>>
>>>>>>>> The high level proposal is to catch up with Python and Java, and
>>>>>>>> have a coder registry. In addition, arrays, and maps should be 
>>>>>>>> permitted as
>>>>>>>> well.
>>>>>>>>
>>>>>>>> If you have alternatives, or other suggestions and opinions, I'd
>>>>>>>> love to hear them! Otherwise my intent is to get a PR ready by the end 
>>>>>>>> of
>>>>>>>> January.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Robert Burke
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> http://go/where-is-rebo
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Cheers,
>>>> Gleb
>>>>
>>>

Reply via email to