Thanks, Brian. It makes sense, it wasn't entirely clear from commit messages, that's why I wanted to double check.
On Tue, Sep 3, 2019 at 5:43 PM Brian Hulette <[email protected]> wrote: > Hey Gleb, thanks for bringing this up. The PR that was reverted (8853) is > the same one that I referred to earlier in this thread. It modified the > existing portable schema representation to match what we settled on here - > and yes it removed support for logical types like fixed bytes. I > (foolishly) assumed the portable schema representation wasn't actually used > anywhere yet, so I figured it would be a harmless regression until we were > able to add support again with a logical type registry (BEAM-7855 [1]). But > it turns out the proto representation is used to build cloud objects in the > dataflow runner, and so the change led to issues in Dataflow [2]. Thus the > reversion. > > I absolutely agree now we should finish BEAM-7855 first before making > this change again. I'm also taking a look at some of the tests that should > have caught this issue to see how it slipped through the cracks. > > Brian > > [1] https://issues.apache.org/jira/browse/BEAM-7855 > [2] https://issues.apache.org/jira/browse/BEAM-8111 > > On Tue, Sep 3, 2019 at 7:27 AM Gleb Kanterov <[email protected]> wrote: > >> Recently there was a pull request (that was reverted) for adding portable >> representation of schemas. It's great to see things moving forward, I'm >> worried that it doesn't support any logical types, especially fixed bytes. >> That makes runners using portable schemas unusable, for instance, when >> reading certain Avro files, while it was possible before. I'm wondering if >> it would be possible to include logical types into initial implementation >> or add a feature flag to opt-in to portable schema representation. >> >> [1]: https://github.com/apache/beam/pull/8853 >> >> Gleb >> >> On Wed, Jun 19, 2019 at 7:08 PM Kenneth Knowles <[email protected]> wrote: >> >>> Seems like a practical approach to get moving on things. Just to restate >>> my understanding: >>> >>> - in Java it is PCollection<T> but with the row coder holding >>> to/from/clazz (I'm calling it row coder because row is the binary format, >>> while schemas may have multiple possible formats) >>> - in portability, the URN + payload "is" the type, so it is >>> PCollection<row, to, from, clazz> >>> - in Python it is interpreted as PCollection<row> >>> >>> We may eventually hit an issue caused by the asymmetry between >>> portability, the authoring SDK (in this case Java), and the external SDK >>> (in this case hypothetically Python). But perhaps the asymmetry is >>> natural/harmless. We may eventually want it in the general form of the >>> payload carrying identifying info of what the authoring SDK is. >>> >>> As to attaching conversions to operations rather than coders, I'm not so >>> sure. It gets at the two natures of coders: >>> >>> 1. URN + payload fully specify how to interpret the bytes of an element >>> 2. An SDK's implementation of the coder for a URN + payload is a >>> contract with DoFns authored in that SDK what SDK-specific type they will >>> receive >>> >>> The idea of storing the to/from conversions on operations themselves >>> would be a sort of decoupling of 1 and 2. Assuming we design something that >>> still allows eliding conversions, I expect that will look nearly identical. >>> Coders are representation to/from Bytes, while this conversion layer is T >>> to/from representation. So I might think we can do it without adding >>> anything to the model. >>> >>> Kenn >>> >>> On Tue, Jun 18, 2019 at 11:04 PM Reuven Lax <[email protected]> wrote: >>> >>>> Robert, you are correct that in principle the to/from functions are >>>> needed on the operation, as that's where automatic conversion happens (in >>>> Java it happens in DoFnRunner). However there are two blockers there: >>>> >>>> 1. As Brian mentioned, the issue in Java is that we never have >>>> PCollection<Row> in this case. The source PCollection will simply be >>>> PCollection<T>, where T has a schema. The to/from functions are now >>>> required to interpret this PCollection. Currently we need to put it on the >>>> PCollection itself to may Java's type system happy (an alternative is to >>>> always create an intermediate PCollection<Row>, but that would be >>>> computationally expensive). We might be able to find a way to model this in >>>> Java with the to/from on the operation, however I suspect it would be >>>> difficult and a lot of work. >>>> >>>> 2. I believe there are some cases where PTransforms access the to/from >>>> functions in expand(), which is before we have an operation to attach the >>>> those functions to. Again this is presumably solvable, but would require >>>> design and more work. >>>> >>>> 3. Currently the user can call setSchema on any PCollection, and pass >>>> in to/from functions there. We would have to rethink this API. >>>> >>>> So I think leaving it in the coder is the pragmatic approach for now, >>>> though it would be interesting to see if we could solve the above issues >>>> and instead automatically propagate the functions to the operation. >>>> >>>> I agree that we should not make these things opaque in the portable >>>> representation, if only for ease of debugging. However they should not be >>>> needed for cross-language calls. >>>> >>>> Reuven >>>> >>>> On Tue, Jun 18, 2019 at 5:09 AM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> Thanks for updating that alternative. >>>>> >>>>> As for the to/from functions, it does seem pragmatic to dangle them >>>>> off the purely portable representation (either as a field there, or as >>>>> an opaque logical type whose payload contains the to/from functions, >>>>> or a separate coder that wraps the schema coder (though I can't see >>>>> how the latter would work well if nesting is allowed)) until we figure >>>>> out a good way to attach them to the operations themselves. >>>>> >>>>> On Tue, Jun 18, 2019 at 2:37 AM Brian Hulette <[email protected]> >>>>> wrote: >>>>> > >>>>> > Realized I completely ignored one of your points, added another >>>>> response inline. >>>>> > >>>>> > On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw <[email protected]> >>>>> wrote: >>>>> >> >>>>> >> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <[email protected]> >>>>> wrote: >>>>> >> > >>>>> >> > Spoke to Brian about his proposal. It is essentially this: >>>>> >> > >>>>> >> > We create PortableSchemaCoder, with a well-known URN. This coder >>>>> is parameterized by the schema (i.e. list of field name -> field type >>>>> pairs). >>>>> >> >>>>> >> Given that we have a field type that is (list of field names -> >>>>> field >>>>> >> type pairs), is there a reason to do this enumeration at the top >>>>> level >>>>> >> as well? This would likely also eliminate some of the strangeness >>>>> >> where we want to treat a PCollection with a single-field row as a >>>>> >> PCollection with just that value instead. >>>>> > >>>>> > >>>>> > This is part of what I was suggesting in my "Root schema is a >>>>> logical type" alternative [1], except that the language about SDK-specific >>>>> logical types is now obsolete. I'll update it to better reflect this >>>>> alternative. >>>>> > I do think at the very least we should just have one (list of field >>>>> names -> field type pairs) that is re-used, which is what I did in my PR >>>>> [2]. >>>>> > >>>>> > [1] >>>>> https://docs.google.com/document/d/1uu9pJktzT_O3DxGd1-Q2op4nRk4HekIZbzi-0oTAips/edit#heading=h.7570feur1qin >>>>> > [2] >>>>> https://github.com/apache/beam/pull/8853/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2L686 >>>>> > >>>>> >> >>>>> >> >>>>> >> > Java also continues to have its own CustomSchemaCoder. This is >>>>> parameterized by the schema as well as the to/from functions needed to >>>>> make >>>>> the Java API "nice." >>>>> >> > >>>>> >> > When the expansion service expands a Java PTransform for usage >>>>> across languages, it will add a transform mapping the PCollection with >>>>> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way >>>>> Java can maintain the information needed to maintain its API (and Python >>>>> can do the same), but there's no need to shove this information into the >>>>> well-known portable representation. >>>>> >> > >>>>> >> > Brian, can you confirm that this was your proposal? If so, I like >>>>> it. >>>>> >> >>>>> >> The major downside of this that I see is that it assumes that >>>>> >> transparency is only needed at certain "boundaries" and everything >>>>> >> between these boundaries is opaque. I think we'd be better served >>>>> by a >>>>> >> format where schemas are transparently represented throughout. For >>>>> >> example, the "boundaries" between runner and SDK are not known at >>>>> >> pipeline construction time, and we want the runner <-> SDK >>>>> >> communication to understand the schemas to be able to use more >>>>> >> efficient transport mechanisms (e.g. batches of arrow records). It >>>>> may >>>>> >> also be common for a pipeline in language X to invoke two transforms >>>>> >> in language Y in succession (e.g. two SQL statements) in which case >>>>> >> introducing two extra transforms in the expansion service would be >>>>> >> wasteful. I also think we want to allow the flexibility for runners >>>>> to >>>>> >> swap out transforms an optimizations regardless of construction-time >>>>> >> boundaries (e.g. implementing a projection natively, rather than >>>>> >> outsourcing to the SDK). >>>>> >> >>>>> >> Are the to/from conversion functions the only extra information >>>>> needed >>>>> >> to make the Java APIs nice? If so, can they be attached to the >>>>> >> operations themselves (where it seems they're actually needed/used), >>>>> >> rather than to the schema/coder of the PCollection? Alternatively, >>>>> I'd >>>>> >> prefer this be opaque metadata attached to a transparent schema >>>>> rather >>>>> >> than making the whole schema opaque. >>>>> >> >>>>> >> > We've gone back and forth discussing abstracts for over a month >>>>> now. I suggest that the next step should be to create a PR, and move >>>>> discussion to that PR. Having actual code can often make discussion much >>>>> more concrete. >>>>> >> >>>>> >> +1 to a PR, though I feel like there are fundamental high-level >>>>> issues >>>>> >> that are still not decided. (I suppose we should be open to throwing >>>>> >> whole PRs away in that case.) There are certainly pieces that we'll >>>>> >> know that we need (like the ability to serialize a row consistently >>>>> in >>>>> >> all languages) we can get in immediately. >>>>> >> >>>>> >> > Reuven >>>>> >> > >>>>> >> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >> >> >>>>> >> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <[email protected]> >>>>> wrote: >>>>> >> >>> >>>>> >> >>> >>>>> >> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles < >>>>> [email protected]> wrote: >>>>> >> >>>> >>>>> >> >>>> Can we choose a first step? I feel there's consensus around: >>>>> >> >>>> >>>>> >> >>>> - the basic idea of what a schema looks like, ignoring >>>>> logical types or SDK-specific bits >>>>> >> >>>> - the version of logical type which is a standardized >>>>> URN+payload plus a representation >>>>> >> >>>> >>>>> >> >>>> Perhaps we could commit this and see what it looks like to try >>>>> to use it? >>>>> >> >> >>>>> >> >> >>>>> >> >> +1 >>>>> >> >> >>>>> >> >>>> >>>>> >> >>>> It also seems like there might be consensus around the idea of >>>>> each of: >>>>> >> >>>> >>>>> >> >>>> - a coder that simply encodes rows; its payload is just a >>>>> schema; it is minimalist, canonical >>>>> >> >>>> >>>>> >> >>>> - a coder that encodes a non-row using the serialization >>>>> format of a row; this has to be a coder (versus Convert transforms) so >>>>> that >>>>> to/from row conversions can be elided when primitives are fused (just like >>>>> to/from bytes is elided) >>>>> >> >> >>>>> >> >> >>>>> >> >> So, to make it concrete, in the Beam protos we would have an >>>>> [Elementwise]SchemaCoder whose single parameterization would be FieldType, >>>>> whose definition is in terms of URN + payload + components (+ >>>>> representation, for non-primitive types, some details TBD there). It could >>>>> be deserialized into various different Coder instances (an SDK >>>>> implementation detail) in an SDK depending on the type. One of the most >>>>> important primitive field types is Row (aka Struct). >>>>> >> >> >>>>> >> >> We would define a byte encoding for each primitive type. We >>>>> *could* choose to simply require that the encoding of any non-row >>>>> primitive >>>>> is the same as its encoding in a single-member row, but that's not >>>>> necessary. >>>>> >> >> >>>>> >> >> In the short term, the window/timestamp/pane info would still >>>>> live outside via an enclosing WindowCoder, as it does now, not blocking on >>>>> a desirable but still-to-be-figured-out unification at that level. >>>>> >> >> >>>>> >> >> This seems like a good path forward. >>>>> >> >> >>>>> >> >>> Actually this doesn't make sense to me. I think from the >>>>> portability perspective, all we have is schemas - the rest is just a >>>>> convenience for the SDK. As such, I don't think it makes sense at all to >>>>> model this as a Coder. >>>>> >> >> >>>>> >> >> >>>>> >> >> Coder and Schemas are mutually exclusive on PCollections, and >>>>> completely specify type information, so I think it makes sense to reuse >>>>> this (as we're currently doing) until we can get rid of coders altogether. >>>>> >> >> >>>>> >> >> (At execution time, we would generalize the notion of a coder to >>>>> indicate how *batches* of elements are encoded, not just how individual >>>>> elements are encoded. Here we have the option of letting the runner pick >>>>> depending on the use (e.g. elementwise for key lookups vs. arrow for bulk >>>>> data channel transfer vs ???, possibly with parameters like "preferred >>>>> batch size") or standardizing on one physical byte representation for all >>>>> communication over the boundary.) >>>>> >> >> >>>>> >> >>> >>>>> >> >>> >>>>> >> >>>> >>>>> >> >>>> >>>>> >> >>>> Can we also just have both of these, with different URNs? >>>>> >> >>>> >>>>> >> >>>> Kenn >>>>> >> >>>> >>>>> >> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <[email protected]> >>>>> wrote: >>>>> >> >>>>> >>>>> >> >>>>> >>>>> >> >>>>> >>>>> >> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw < >>>>> [email protected]> wrote: >>>>> >> >>>>>> >>>>> >> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles < >>>>> [email protected]> wrote: >>>>> >> >>>>>>> >>>>> >> >>>>>>> >>>>> >> >>>>>>> I believe the schema registry is a transient >>>>> construction-time concept. I don't think there's any need for a concept of >>>>> a registry in the portable representation. >>>>> >> >>>>>>> >>>>> >> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used >>>>> whenever one has (say) a Java POJO as that would prevent other SDKs from >>>>> "understanding" it as above (unless we had a way of declaring it as "just >>>>> an alias/wrapper"). >>>>> >> >>>>>>> >>>>> >> >>>>>>> >>>>> >> >>>>>>> I didn't understand the example I snipped, but I think I >>>>> understand your concern here. Is this what you want? (a) something >>>>> presented as a POJO in Java (b) encoded to a row, but still decoded to the >>>>> POJO and (c) non-Java SDK knows that it is "just a struct" so it is safe >>>>> to >>>>> mess about with or even create new ones. If this is what you want it seems >>>>> potentially useful, but also easy to live without. This can also be done >>>>> entirely within the Java SDK via conversions, leaving no logical type in >>>>> the portable pipeline. >>>>> >> >>>>>> >>>>> >> >>>>>> >>>>> >> >>>>>> I'm imaging a world where someone defines a PTransform that >>>>> takes a POJO for a constructor, and consumes and produces a POJO, and is >>>>> now usable from Go with no additional work on the PTransform author's >>>>> part. But maybe I'm thinking about this wrong and the POJO <-> Row >>>>> conversion is part of the @ProcesssElement magic, not encoded in the >>>>> schema >>>>> itself. >>>>> >> >>>>> >>>>> >> >>>>> >>>>> >> >>>>> The user's output would have to be explicitly schema. They >>>>> would somehow have to tell Beam the infer a schema from the output POJO >>>>> (e.g. one way to do this is to annotate the POJO with the @DefaultSchema >>>>> annotation). We don't currently magically turn a POJO into a schema >>>>> unless >>>>> we are asked to do so. >>>>> >>>> >> >> -- >> Cheers, >> Gleb >> > -- Cheers, Gleb
