On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <re...@google.com> wrote: > > Spoke to Brian about his proposal. It is essentially this: > > We create PortableSchemaCoder, with a well-known URN. This coder is > parameterized by the schema (i.e. list of field name -> field type pairs).
Given that we have a field type that is (list of field names -> field type pairs), is there a reason to do this enumeration at the top level as well? This would likely also eliminate some of the strangeness where we want to treat a PCollection with a single-field row as a PCollection with just that value instead. > Java also continues to have its own CustomSchemaCoder. This is parameterized > by the schema as well as the to/from functions needed to make the Java API > "nice." > > When the expansion service expands a Java PTransform for usage across > languages, it will add a transform mapping the PCollection with > CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way > Java can maintain the information needed to maintain its API (and Python can > do the same), but there's no need to shove this information into the > well-known portable representation. > > Brian, can you confirm that this was your proposal? If so, I like it. The major downside of this that I see is that it assumes that transparency is only needed at certain "boundaries" and everything between these boundaries is opaque. I think we'd be better served by a format where schemas are transparently represented throughout. For example, the "boundaries" between runner and SDK are not known at pipeline construction time, and we want the runner <-> SDK communication to understand the schemas to be able to use more efficient transport mechanisms (e.g. batches of arrow records). It may also be common for a pipeline in language X to invoke two transforms in language Y in succession (e.g. two SQL statements) in which case introducing two extra transforms in the expansion service would be wasteful. I also think we want to allow the flexibility for runners to swap out transforms an optimizations regardless of construction-time boundaries (e.g. implementing a projection natively, rather than outsourcing to the SDK). Are the to/from conversion functions the only extra information needed to make the Java APIs nice? If so, can they be attached to the operations themselves (where it seems they're actually needed/used), rather than to the schema/coder of the PCollection? Alternatively, I'd prefer this be opaque metadata attached to a transparent schema rather than making the whole schema opaque. > We've gone back and forth discussing abstracts for over a month now. I > suggest that the next step should be to create a PR, and move discussion to > that PR. Having actual code can often make discussion much more concrete. +1 to a PR, though I feel like there are fundamental high-level issues that are still not decided. (I suppose we should be open to throwing whole PRs away in that case.) There are certainly pieces that we'll know that we need (like the ability to serialize a row consistently in all languages) we can get in immediately. > Reuven > > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw <rober...@google.com> wrote: >> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <re...@google.com> wrote: >>> >>> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles <k...@apache.org> wrote: >>>> >>>> Can we choose a first step? I feel there's consensus around: >>>> >>>> - the basic idea of what a schema looks like, ignoring logical types or >>>> SDK-specific bits >>>> - the version of logical type which is a standardized URN+payload plus a >>>> representation >>>> >>>> Perhaps we could commit this and see what it looks like to try to use it? >> >> >> +1 >> >>>> >>>> It also seems like there might be consensus around the idea of each of: >>>> >>>> - a coder that simply encodes rows; its payload is just a schema; it is >>>> minimalist, canonical >>>> >>>> - a coder that encodes a non-row using the serialization format of a row; >>>> this has to be a coder (versus Convert transforms) so that to/from row >>>> conversions can be elided when primitives are fused (just like to/from >>>> bytes is elided) >> >> >> So, to make it concrete, in the Beam protos we would have an >> [Elementwise]SchemaCoder whose single parameterization would be FieldType, >> whose definition is in terms of URN + payload + components (+ >> representation, for non-primitive types, some details TBD there). It could >> be deserialized into various different Coder instances (an SDK >> implementation detail) in an SDK depending on the type. One of the most >> important primitive field types is Row (aka Struct). >> >> We would define a byte encoding for each primitive type. We *could* choose >> to simply require that the encoding of any non-row primitive is the same as >> its encoding in a single-member row, but that's not necessary. >> >> In the short term, the window/timestamp/pane info would still live outside >> via an enclosing WindowCoder, as it does now, not blocking on a desirable >> but still-to-be-figured-out unification at that level. >> >> This seems like a good path forward. >> >>> Actually this doesn't make sense to me. I think from the portability >>> perspective, all we have is schemas - the rest is just a convenience for >>> the SDK. As such, I don't think it makes sense at all to model this as a >>> Coder. >> >> >> Coder and Schemas are mutually exclusive on PCollections, and completely >> specify type information, so I think it makes sense to reuse this (as we're >> currently doing) until we can get rid of coders altogether. >> >> (At execution time, we would generalize the notion of a coder to indicate >> how *batches* of elements are encoded, not just how individual elements are >> encoded. Here we have the option of letting the runner pick depending on the >> use (e.g. elementwise for key lookups vs. arrow for bulk data channel >> transfer vs ???, possibly with parameters like "preferred batch size") or >> standardizing on one physical byte representation for all communication over >> the boundary.) >> >>> >>> >>>> >>>> >>>> Can we also just have both of these, with different URNs? >>>> >>>> Kenn >>>> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>> >>>>> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>>> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles <k...@apache.org> wrote: >>>>>>> >>>>>>> >>>>>>> I believe the schema registry is a transient construction-time concept. >>>>>>> I don't think there's any need for a concept of a registry in the >>>>>>> portable representation. >>>>>>> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used whenever >>>>>>>> one has (say) a Java POJO as that would prevent other SDKs from >>>>>>>> "understanding" it as above (unless we had a way of declaring it as >>>>>>>> "just an alias/wrapper"). >>>>>>> >>>>>>> >>>>>>> I didn't understand the example I snipped, but I think I understand >>>>>>> your concern here. Is this what you want? (a) something presented as a >>>>>>> POJO in Java (b) encoded to a row, but still decoded to the POJO and >>>>>>> (c) non-Java SDK knows that it is "just a struct" so it is safe to mess >>>>>>> about with or even create new ones. If this is what you want it seems >>>>>>> potentially useful, but also easy to live without. This can also be >>>>>>> done entirely within the Java SDK via conversions, leaving no logical >>>>>>> type in the portable pipeline. >>>>>> >>>>>> >>>>>> I'm imaging a world where someone defines a PTransform that takes a POJO >>>>>> for a constructor, and consumes and produces a POJO, and is now usable >>>>>> from Go with no additional work on the PTransform author's part. But >>>>>> maybe I'm thinking about this wrong and the POJO <-> Row conversion is >>>>>> part of the @ProcesssElement magic, not encoded in the schema itself. >>>>> >>>>> >>>>> The user's output would have to be explicitly schema. They would somehow >>>>> have to tell Beam the infer a schema from the output POJO (e.g. one way >>>>> to do this is to annotate the POJO with the @DefaultSchema annotation). >>>>> We don't currently magically turn a POJO into a schema unless we are >>>>> asked to do so.