Hey Gleb, thanks for bringing this up. The PR that was reverted (8853) is the same one that I referred to earlier in this thread. It modified the existing portable schema representation to match what we settled on here - and yes it removed support for logical types like fixed bytes. I (foolishly) assumed the portable schema representation wasn't actually used anywhere yet, so I figured it would be a harmless regression until we were able to add support again with a logical type registry (BEAM-7855 [1]). But it turns out the proto representation is used to build cloud objects in the dataflow runner, and so the change led to issues in Dataflow [2]. Thus the reversion.
I absolutely agree now we should finish BEAM-7855 first before making this change again. I'm also taking a look at some of the tests that should have caught this issue to see how it slipped through the cracks. Brian [1] https://issues.apache.org/jira/browse/BEAM-7855 [2] https://issues.apache.org/jira/browse/BEAM-8111 On Tue, Sep 3, 2019 at 7:27 AM Gleb Kanterov <[email protected]> wrote: > Recently there was a pull request (that was reverted) for adding portable > representation of schemas. It's great to see things moving forward, I'm > worried that it doesn't support any logical types, especially fixed bytes. > That makes runners using portable schemas unusable, for instance, when > reading certain Avro files, while it was possible before. I'm wondering if > it would be possible to include logical types into initial implementation > or add a feature flag to opt-in to portable schema representation. > > [1]: https://github.com/apache/beam/pull/8853 > > Gleb > > On Wed, Jun 19, 2019 at 7:08 PM Kenneth Knowles <[email protected]> wrote: > >> Seems like a practical approach to get moving on things. Just to restate >> my understanding: >> >> - in Java it is PCollection<T> but with the row coder holding >> to/from/clazz (I'm calling it row coder because row is the binary format, >> while schemas may have multiple possible formats) >> - in portability, the URN + payload "is" the type, so it is >> PCollection<row, to, from, clazz> >> - in Python it is interpreted as PCollection<row> >> >> We may eventually hit an issue caused by the asymmetry between >> portability, the authoring SDK (in this case Java), and the external SDK >> (in this case hypothetically Python). But perhaps the asymmetry is >> natural/harmless. We may eventually want it in the general form of the >> payload carrying identifying info of what the authoring SDK is. >> >> As to attaching conversions to operations rather than coders, I'm not so >> sure. It gets at the two natures of coders: >> >> 1. URN + payload fully specify how to interpret the bytes of an element >> 2. An SDK's implementation of the coder for a URN + payload is a contract >> with DoFns authored in that SDK what SDK-specific type they will receive >> >> The idea of storing the to/from conversions on operations themselves >> would be a sort of decoupling of 1 and 2. Assuming we design something that >> still allows eliding conversions, I expect that will look nearly identical. >> Coders are representation to/from Bytes, while this conversion layer is T >> to/from representation. So I might think we can do it without adding >> anything to the model. >> >> Kenn >> >> On Tue, Jun 18, 2019 at 11:04 PM Reuven Lax <[email protected]> wrote: >> >>> Robert, you are correct that in principle the to/from functions are >>> needed on the operation, as that's where automatic conversion happens (in >>> Java it happens in DoFnRunner). However there are two blockers there: >>> >>> 1. As Brian mentioned, the issue in Java is that we never have >>> PCollection<Row> in this case. The source PCollection will simply be >>> PCollection<T>, where T has a schema. The to/from functions are now >>> required to interpret this PCollection. Currently we need to put it on the >>> PCollection itself to may Java's type system happy (an alternative is to >>> always create an intermediate PCollection<Row>, but that would be >>> computationally expensive). We might be able to find a way to model this in >>> Java with the to/from on the operation, however I suspect it would be >>> difficult and a lot of work. >>> >>> 2. I believe there are some cases where PTransforms access the to/from >>> functions in expand(), which is before we have an operation to attach the >>> those functions to. Again this is presumably solvable, but would require >>> design and more work. >>> >>> 3. Currently the user can call setSchema on any PCollection, and pass in >>> to/from functions there. We would have to rethink this API. >>> >>> So I think leaving it in the coder is the pragmatic approach for now, >>> though it would be interesting to see if we could solve the above issues >>> and instead automatically propagate the functions to the operation. >>> >>> I agree that we should not make these things opaque in the portable >>> representation, if only for ease of debugging. However they should not be >>> needed for cross-language calls. >>> >>> Reuven >>> >>> On Tue, Jun 18, 2019 at 5:09 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> Thanks for updating that alternative. >>>> >>>> As for the to/from functions, it does seem pragmatic to dangle them >>>> off the purely portable representation (either as a field there, or as >>>> an opaque logical type whose payload contains the to/from functions, >>>> or a separate coder that wraps the schema coder (though I can't see >>>> how the latter would work well if nesting is allowed)) until we figure >>>> out a good way to attach them to the operations themselves. >>>> >>>> On Tue, Jun 18, 2019 at 2:37 AM Brian Hulette <[email protected]> >>>> wrote: >>>> > >>>> > Realized I completely ignored one of your points, added another >>>> response inline. >>>> > >>>> > On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >> >>>> >> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <[email protected]> wrote: >>>> >> > >>>> >> > Spoke to Brian about his proposal. It is essentially this: >>>> >> > >>>> >> > We create PortableSchemaCoder, with a well-known URN. This coder >>>> is parameterized by the schema (i.e. list of field name -> field type >>>> pairs). >>>> >> >>>> >> Given that we have a field type that is (list of field names -> field >>>> >> type pairs), is there a reason to do this enumeration at the top >>>> level >>>> >> as well? This would likely also eliminate some of the strangeness >>>> >> where we want to treat a PCollection with a single-field row as a >>>> >> PCollection with just that value instead. >>>> > >>>> > >>>> > This is part of what I was suggesting in my "Root schema is a logical >>>> type" alternative [1], except that the language about SDK-specific logical >>>> types is now obsolete. I'll update it to better reflect this alternative. >>>> > I do think at the very least we should just have one (list of field >>>> names -> field type pairs) that is re-used, which is what I did in my PR >>>> [2]. >>>> > >>>> > [1] >>>> https://docs.google.com/document/d/1uu9pJktzT_O3DxGd1-Q2op4nRk4HekIZbzi-0oTAips/edit#heading=h.7570feur1qin >>>> > [2] >>>> https://github.com/apache/beam/pull/8853/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2L686 >>>> > >>>> >> >>>> >> >>>> >> > Java also continues to have its own CustomSchemaCoder. This is >>>> parameterized by the schema as well as the to/from functions needed to make >>>> the Java API "nice." >>>> >> > >>>> >> > When the expansion service expands a Java PTransform for usage >>>> across languages, it will add a transform mapping the PCollection with >>>> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way >>>> Java can maintain the information needed to maintain its API (and Python >>>> can do the same), but there's no need to shove this information into the >>>> well-known portable representation. >>>> >> > >>>> >> > Brian, can you confirm that this was your proposal? If so, I like >>>> it. >>>> >> >>>> >> The major downside of this that I see is that it assumes that >>>> >> transparency is only needed at certain "boundaries" and everything >>>> >> between these boundaries is opaque. I think we'd be better served by >>>> a >>>> >> format where schemas are transparently represented throughout. For >>>> >> example, the "boundaries" between runner and SDK are not known at >>>> >> pipeline construction time, and we want the runner <-> SDK >>>> >> communication to understand the schemas to be able to use more >>>> >> efficient transport mechanisms (e.g. batches of arrow records). It >>>> may >>>> >> also be common for a pipeline in language X to invoke two transforms >>>> >> in language Y in succession (e.g. two SQL statements) in which case >>>> >> introducing two extra transforms in the expansion service would be >>>> >> wasteful. I also think we want to allow the flexibility for runners >>>> to >>>> >> swap out transforms an optimizations regardless of construction-time >>>> >> boundaries (e.g. implementing a projection natively, rather than >>>> >> outsourcing to the SDK). >>>> >> >>>> >> Are the to/from conversion functions the only extra information >>>> needed >>>> >> to make the Java APIs nice? If so, can they be attached to the >>>> >> operations themselves (where it seems they're actually needed/used), >>>> >> rather than to the schema/coder of the PCollection? Alternatively, >>>> I'd >>>> >> prefer this be opaque metadata attached to a transparent schema >>>> rather >>>> >> than making the whole schema opaque. >>>> >> >>>> >> > We've gone back and forth discussing abstracts for over a month >>>> now. I suggest that the next step should be to create a PR, and move >>>> discussion to that PR. Having actual code can often make discussion much >>>> more concrete. >>>> >> >>>> >> +1 to a PR, though I feel like there are fundamental high-level >>>> issues >>>> >> that are still not decided. (I suppose we should be open to throwing >>>> >> whole PRs away in that case.) There are certainly pieces that we'll >>>> >> know that we need (like the ability to serialize a row consistently >>>> in >>>> >> all languages) we can get in immediately. >>>> >> >>>> >> > Reuven >>>> >> > >>>> >> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >> >> >>>> >> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <[email protected]> >>>> wrote: >>>> >> >>> >>>> >> >>> >>>> >> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >> >>>> >>>> >> >>>> Can we choose a first step? I feel there's consensus around: >>>> >> >>>> >>>> >> >>>> - the basic idea of what a schema looks like, ignoring logical >>>> types or SDK-specific bits >>>> >> >>>> - the version of logical type which is a standardized >>>> URN+payload plus a representation >>>> >> >>>> >>>> >> >>>> Perhaps we could commit this and see what it looks like to try >>>> to use it? >>>> >> >> >>>> >> >> >>>> >> >> +1 >>>> >> >> >>>> >> >>>> >>>> >> >>>> It also seems like there might be consensus around the idea of >>>> each of: >>>> >> >>>> >>>> >> >>>> - a coder that simply encodes rows; its payload is just a >>>> schema; it is minimalist, canonical >>>> >> >>>> >>>> >> >>>> - a coder that encodes a non-row using the serialization >>>> format of a row; this has to be a coder (versus Convert transforms) so that >>>> to/from row conversions can be elided when primitives are fused (just like >>>> to/from bytes is elided) >>>> >> >> >>>> >> >> >>>> >> >> So, to make it concrete, in the Beam protos we would have an >>>> [Elementwise]SchemaCoder whose single parameterization would be FieldType, >>>> whose definition is in terms of URN + payload + components (+ >>>> representation, for non-primitive types, some details TBD there). It could >>>> be deserialized into various different Coder instances (an SDK >>>> implementation detail) in an SDK depending on the type. One of the most >>>> important primitive field types is Row (aka Struct). >>>> >> >> >>>> >> >> We would define a byte encoding for each primitive type. We >>>> *could* choose to simply require that the encoding of any non-row primitive >>>> is the same as its encoding in a single-member row, but that's not >>>> necessary. >>>> >> >> >>>> >> >> In the short term, the window/timestamp/pane info would still >>>> live outside via an enclosing WindowCoder, as it does now, not blocking on >>>> a desirable but still-to-be-figured-out unification at that level. >>>> >> >> >>>> >> >> This seems like a good path forward. >>>> >> >> >>>> >> >>> Actually this doesn't make sense to me. I think from the >>>> portability perspective, all we have is schemas - the rest is just a >>>> convenience for the SDK. As such, I don't think it makes sense at all to >>>> model this as a Coder. >>>> >> >> >>>> >> >> >>>> >> >> Coder and Schemas are mutually exclusive on PCollections, and >>>> completely specify type information, so I think it makes sense to reuse >>>> this (as we're currently doing) until we can get rid of coders altogether. >>>> >> >> >>>> >> >> (At execution time, we would generalize the notion of a coder to >>>> indicate how *batches* of elements are encoded, not just how individual >>>> elements are encoded. Here we have the option of letting the runner pick >>>> depending on the use (e.g. elementwise for key lookups vs. arrow for bulk >>>> data channel transfer vs ???, possibly with parameters like "preferred >>>> batch size") or standardizing on one physical byte representation for all >>>> communication over the boundary.) >>>> >> >> >>>> >> >>> >>>> >> >>> >>>> >> >>>> >>>> >> >>>> >>>> >> >>>> Can we also just have both of these, with different URNs? >>>> >> >>>> >>>> >> >>>> Kenn >>>> >> >>>> >>>> >> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <[email protected]> >>>> wrote: >>>> >> >>>>> >>>> >> >>>>> >>>> >> >>>>> >>>> >> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw < >>>> [email protected]> wrote: >>>> >> >>>>>> >>>> >> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles < >>>> [email protected]> wrote: >>>> >> >>>>>>> >>>> >> >>>>>>> >>>> >> >>>>>>> I believe the schema registry is a transient >>>> construction-time concept. I don't think there's any need for a concept of >>>> a registry in the portable representation. >>>> >> >>>>>>> >>>> >> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used >>>> whenever one has (say) a Java POJO as that would prevent other SDKs from >>>> "understanding" it as above (unless we had a way of declaring it as "just >>>> an alias/wrapper"). >>>> >> >>>>>>> >>>> >> >>>>>>> >>>> >> >>>>>>> I didn't understand the example I snipped, but I think I >>>> understand your concern here. Is this what you want? (a) something >>>> presented as a POJO in Java (b) encoded to a row, but still decoded to the >>>> POJO and (c) non-Java SDK knows that it is "just a struct" so it is safe to >>>> mess about with or even create new ones. If this is what you want it seems >>>> potentially useful, but also easy to live without. This can also be done >>>> entirely within the Java SDK via conversions, leaving no logical type in >>>> the portable pipeline. >>>> >> >>>>>> >>>> >> >>>>>> >>>> >> >>>>>> I'm imaging a world where someone defines a PTransform that >>>> takes a POJO for a constructor, and consumes and produces a POJO, and is >>>> now usable from Go with no additional work on the PTransform author's >>>> part. But maybe I'm thinking about this wrong and the POJO <-> Row >>>> conversion is part of the @ProcesssElement magic, not encoded in the schema >>>> itself. >>>> >> >>>>> >>>> >> >>>>> >>>> >> >>>>> The user's output would have to be explicitly schema. They >>>> would somehow have to tell Beam the infer a schema from the output POJO >>>> (e.g. one way to do this is to annotate the POJO with the @DefaultSchema >>>> annotation). We don't currently magically turn a POJO into a schema unless >>>> we are asked to do so. >>>> >>> > > -- > Cheers, > Gleb >
