Hey Gleb, thanks for bringing this up. The PR that was reverted (8853) is
the same one that I referred to earlier in this thread. It modified the
existing portable schema representation to match what we settled on here -
and yes it removed support for logical types like fixed bytes. I
(foolishly) assumed the portable schema representation wasn't actually used
anywhere yet, so I figured it would be a harmless regression until we were
able to add support again with a logical type registry (BEAM-7855 [1]). But
it turns out the proto representation is used to build cloud objects in the
dataflow runner, and so the change led to issues in Dataflow [2]. Thus the
reversion.

I absolutely agree now we should finish BEAM-7855 first before making
this change again. I'm also taking a look at some of the tests that should
have caught this issue to see how it slipped through the cracks.

Brian

[1] https://issues.apache.org/jira/browse/BEAM-7855
[2] https://issues.apache.org/jira/browse/BEAM-8111

On Tue, Sep 3, 2019 at 7:27 AM Gleb Kanterov <[email protected]> wrote:

> Recently there was a pull request (that was reverted) for adding portable
> representation of schemas. It's great to see things moving forward, I'm
> worried that it doesn't support any logical types, especially fixed bytes.
> That makes runners using portable schemas unusable, for instance, when
> reading certain Avro files, while it was possible before. I'm wondering if
> it would be possible to include logical types into initial implementation
> or add a feature flag to opt-in to portable schema representation.
>
> [1]: https://github.com/apache/beam/pull/8853
>
> Gleb
>
> On Wed, Jun 19, 2019 at 7:08 PM Kenneth Knowles <[email protected]> wrote:
>
>> Seems like a practical approach to get moving on things. Just to restate
>> my understanding:
>>
>>  - in Java it is PCollection<T> but with the row coder holding
>> to/from/clazz (I'm calling it row coder because row is the binary format,
>> while schemas may have multiple possible formats)
>>  - in portability, the URN + payload "is" the type, so it is
>> PCollection<row, to, from, clazz>
>>  - in Python it is interpreted as PCollection<row>
>>
>> We may eventually hit an issue caused by the asymmetry between
>> portability, the authoring SDK (in this case Java), and the external SDK
>> (in this case hypothetically Python). But perhaps the asymmetry is
>> natural/harmless. We may eventually want it in the general form of the
>> payload carrying identifying info of what the authoring SDK is.
>>
>> As to attaching conversions to operations rather than coders, I'm not so
>> sure. It gets at the two natures of coders:
>>
>> 1. URN + payload fully specify how to interpret the bytes of an element
>> 2. An SDK's implementation of the coder for a URN + payload is a contract
>> with DoFns authored in that SDK what SDK-specific type they will receive
>>
>> The idea of storing the to/from conversions on operations themselves
>> would be a sort of decoupling of 1 and 2. Assuming we design something that
>> still allows eliding conversions, I expect that will look nearly identical.
>> Coders are representation to/from Bytes, while this conversion layer is T
>> to/from representation. So I might think we can do it without adding
>> anything to the model.
>>
>> Kenn
>>
>> On Tue, Jun 18, 2019 at 11:04 PM Reuven Lax <[email protected]> wrote:
>>
>>> Robert, you are correct that in principle the to/from functions are
>>> needed on the operation, as that's where automatic conversion happens (in
>>> Java it happens in DoFnRunner). However there are two blockers there:
>>>
>>> 1. As Brian mentioned, the issue in Java is that we never have
>>> PCollection<Row> in this case. The source PCollection will simply be
>>> PCollection<T>, where T has a schema. The to/from functions are now
>>> required to interpret this PCollection. Currently we need to put it on the
>>> PCollection itself to may Java's type system happy (an alternative is to
>>> always create an intermediate PCollection<Row>, but that would be
>>> computationally expensive). We might be able to find a way to model this in
>>> Java with the to/from on the operation, however I suspect it would be
>>> difficult and a lot of work.
>>>
>>> 2. I believe there are some cases where PTransforms access the to/from
>>> functions in expand(), which is before we have an operation to attach the
>>> those functions to. Again this is presumably solvable, but would require
>>> design and more work.
>>>
>>> 3. Currently the user can call setSchema on any PCollection, and pass in
>>> to/from functions there. We would have to rethink this API.
>>>
>>> So I think leaving it in the coder is the pragmatic approach for now,
>>> though it would be interesting to see if we could solve the above issues
>>> and instead automatically propagate the functions to the operation.
>>>
>>> I agree that we should not make these things opaque in the portable
>>> representation, if only for ease of debugging. However they should not be
>>> needed for cross-language calls.
>>>
>>> Reuven
>>>
>>> On Tue, Jun 18, 2019 at 5:09 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> Thanks for updating that alternative.
>>>>
>>>> As for the to/from functions, it does seem pragmatic to dangle them
>>>> off the purely portable representation (either as a field there, or as
>>>> an opaque logical type whose payload contains the to/from functions,
>>>> or a separate coder that wraps the schema coder (though I can't see
>>>> how the latter would work well if nesting is allowed)) until we figure
>>>> out a good way to attach them to the operations themselves.
>>>>
>>>> On Tue, Jun 18, 2019 at 2:37 AM Brian Hulette <[email protected]>
>>>> wrote:
>>>> >
>>>> > Realized I completely ignored one of your points, added another
>>>> response inline.
>>>> >
>>>> > On Fri, Jun 14, 2019 at 2:20 AM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>> >>
>>>> >> On Thu, Jun 13, 2019 at 8:42 PM Reuven Lax <[email protected]> wrote:
>>>> >> >
>>>> >> > Spoke to Brian about his proposal. It is essentially this:
>>>> >> >
>>>> >> > We create PortableSchemaCoder, with a well-known URN. This coder
>>>> is parameterized by the schema (i.e. list of field name -> field type
>>>> pairs).
>>>> >>
>>>> >> Given that we have a field type that is (list of field names -> field
>>>> >> type pairs), is there a reason to do this enumeration at the top
>>>> level
>>>> >> as well? This would likely also eliminate some of the strangeness
>>>> >> where we want to treat a PCollection with a single-field row as a
>>>> >> PCollection with just that value instead.
>>>> >
>>>> >
>>>> > This is part of what I was suggesting in my "Root schema is a logical
>>>> type" alternative [1], except that the language about SDK-specific logical
>>>> types is now obsolete. I'll update it to better reflect this alternative.
>>>> > I do think at the very least we should just have one (list of field
>>>> names -> field type pairs) that is re-used, which is what I did in my PR
>>>> [2].
>>>> >
>>>> > [1]
>>>> https://docs.google.com/document/d/1uu9pJktzT_O3DxGd1-Q2op4nRk4HekIZbzi-0oTAips/edit#heading=h.7570feur1qin
>>>> > [2]
>>>> https://github.com/apache/beam/pull/8853/files#diff-f0d64c2cfc4583bfe2a7e5ee59818ae2L686
>>>> >
>>>> >>
>>>> >>
>>>> >> > Java also continues to have its own CustomSchemaCoder. This is
>>>> parameterized by the schema as well as the to/from functions needed to make
>>>> the Java API "nice."
>>>> >> >
>>>> >> > When the expansion service expands a Java PTransform for usage
>>>> across languages, it will add a transform mapping the  PCollection with
>>>> CustomSchemaCoder to a PCollection which has PortableSchemaCoder. This way
>>>> Java can maintain the information needed to maintain its API (and Python
>>>> can do the same), but there's no need to shove this information into the
>>>> well-known portable representation.
>>>> >> >
>>>> >> > Brian, can you confirm that this was your proposal? If so, I like
>>>> it.
>>>> >>
>>>> >> The major downside of this that I see is that it assumes that
>>>> >> transparency is only needed at certain "boundaries" and everything
>>>> >> between these boundaries is opaque. I think we'd be better served by
>>>> a
>>>> >> format where schemas are transparently represented throughout. For
>>>> >> example, the "boundaries" between runner and SDK are not known at
>>>> >> pipeline construction time, and we want the runner <-> SDK
>>>> >> communication to understand the schemas to be able to use more
>>>> >> efficient transport mechanisms (e.g. batches of arrow records). It
>>>> may
>>>> >> also be common for a pipeline in language X to invoke two transforms
>>>> >> in language Y in succession (e.g. two SQL statements) in which case
>>>> >> introducing two extra transforms in the expansion service would be
>>>> >> wasteful. I also think we want to allow the flexibility for runners
>>>> to
>>>> >> swap out transforms an optimizations regardless of construction-time
>>>> >> boundaries (e.g. implementing a projection natively, rather than
>>>> >> outsourcing to the SDK).
>>>> >>
>>>> >> Are the to/from conversion functions the only extra information
>>>> needed
>>>> >> to make the Java APIs nice? If so, can they be attached to the
>>>> >> operations themselves (where it seems they're actually needed/used),
>>>> >> rather than to the schema/coder of the PCollection? Alternatively,
>>>> I'd
>>>> >> prefer this be opaque metadata attached to a transparent schema
>>>> rather
>>>> >> than making the whole schema opaque.
>>>> >>
>>>> >> > We've gone back and forth discussing abstracts for over a month
>>>> now. I suggest that the next step should be to create a PR, and move
>>>> discussion to that PR. Having actual code can often make discussion much
>>>> more concrete.
>>>> >>
>>>> >> +1 to a PR, though I feel like there are fundamental high-level
>>>> issues
>>>> >> that are still not decided. (I suppose we should be open to throwing
>>>> >> whole PRs away in that case.) There are certainly pieces that we'll
>>>> >> know that we need (like the ability to serialize a row consistently
>>>> in
>>>> >> all languages) we can get in immediately.
>>>> >>
>>>> >> > Reuven
>>>> >> >
>>>> >> > On Thu, Jun 13, 2019 at 6:28 AM Robert Bradshaw <
>>>> [email protected]> wrote:
>>>> >> >>
>>>> >> >> On Thu, Jun 13, 2019 at 5:47 AM Reuven Lax <[email protected]>
>>>> wrote:
>>>> >> >>>
>>>> >> >>>
>>>> >> >>> On Wed, Jun 12, 2019 at 8:29 PM Kenneth Knowles <[email protected]>
>>>> wrote:
>>>> >> >>>>
>>>> >> >>>> Can we choose a first step? I feel there's consensus around:
>>>> >> >>>>
>>>> >> >>>>  - the basic idea of what a schema looks like, ignoring logical
>>>> types or SDK-specific bits
>>>> >> >>>>  - the version of logical type which is a standardized
>>>> URN+payload plus a representation
>>>> >> >>>>
>>>> >> >>>> Perhaps we could commit this and see what it looks like to try
>>>> to use it?
>>>> >> >>
>>>> >> >>
>>>> >> >> +1
>>>> >> >>
>>>> >> >>>>
>>>> >> >>>> It also seems like there might be consensus around the idea of
>>>> each of:
>>>> >> >>>>
>>>> >> >>>>  - a coder that simply encodes rows; its payload is just a
>>>> schema; it is minimalist, canonical
>>>> >> >>>>
>>>> >> >>>>  - a coder that encodes a non-row using the serialization
>>>> format of a row; this has to be a coder (versus Convert transforms) so that
>>>> to/from row conversions can be elided when primitives are fused (just like
>>>> to/from bytes is elided)
>>>> >> >>
>>>> >> >>
>>>> >> >> So, to make it concrete, in the Beam protos we would have an
>>>> [Elementwise]SchemaCoder whose single parameterization would be FieldType,
>>>> whose definition is in terms of URN + payload + components (+
>>>> representation, for non-primitive types, some details TBD there). It could
>>>> be deserialized into various different Coder instances (an SDK
>>>> implementation detail) in an SDK depending on the type. One of the most
>>>> important primitive field types is Row (aka Struct).
>>>> >> >>
>>>> >> >> We would define a byte encoding for each primitive type. We
>>>> *could* choose to simply require that the encoding of any non-row primitive
>>>> is the same as its encoding in a single-member row, but that's not
>>>> necessary.
>>>> >> >>
>>>> >> >> In the short term, the window/timestamp/pane info would still
>>>> live outside via an enclosing WindowCoder, as it does now, not blocking on
>>>> a desirable but still-to-be-figured-out unification at that level.
>>>> >> >>
>>>> >> >> This seems like a good path forward.
>>>> >> >>
>>>> >> >>> Actually this doesn't make sense to me. I think from the
>>>> portability perspective, all we have is schemas - the rest is just a
>>>> convenience for the SDK. As such, I don't think it makes sense at all to
>>>> model this as a Coder.
>>>> >> >>
>>>> >> >>
>>>> >> >> Coder and Schemas are mutually exclusive on PCollections, and
>>>> completely specify type information, so I think it makes sense to reuse
>>>> this (as we're currently doing) until we can get rid of coders altogether.
>>>> >> >>
>>>> >> >> (At execution time, we would generalize the notion of a coder to
>>>> indicate how *batches* of elements are encoded, not just how individual
>>>> elements are encoded. Here we have the option of letting the runner pick
>>>> depending on the use (e.g. elementwise for key lookups vs. arrow for bulk
>>>> data channel transfer vs ???, possibly with parameters like "preferred
>>>> batch size") or standardizing on one physical byte representation for all
>>>> communication over the boundary.)
>>>> >> >>
>>>> >> >>>
>>>> >> >>>
>>>> >> >>>>
>>>> >> >>>>
>>>> >> >>>> Can we also just have both of these, with different URNs?
>>>> >> >>>>
>>>> >> >>>> Kenn
>>>> >> >>>>
>>>> >> >>>> On Wed, Jun 12, 2019 at 3:57 PM Reuven Lax <[email protected]>
>>>> wrote:
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>> On Wed, Jun 12, 2019 at 3:46 PM Robert Bradshaw <
>>>> [email protected]> wrote:
>>>> >> >>>>>>
>>>> >> >>>>>> On Tue, Jun 11, 2019 at 8:04 PM Kenneth Knowles <
>>>> [email protected]> wrote:
>>>> >> >>>>>>>
>>>> >> >>>>>>>
>>>> >> >>>>>>> I believe the schema registry is a transient
>>>> construction-time concept. I don't think there's any need for a concept of
>>>> a registry in the portable representation.
>>>> >> >>>>>>>
>>>> >> >>>>>>>> I'd rather urn:beam:schema:logicaltype:javasdk not be used
>>>> whenever one has (say) a Java POJO as that would prevent other SDKs from
>>>> "understanding" it as above (unless we had a way of declaring it as "just
>>>> an alias/wrapper").
>>>> >> >>>>>>>
>>>> >> >>>>>>>
>>>> >> >>>>>>> I didn't understand the example I snipped, but I think I
>>>> understand your concern here. Is this what you want? (a) something
>>>> presented as a POJO in Java (b) encoded to a row, but still decoded to the
>>>> POJO and (c) non-Java SDK knows that it is "just a struct" so it is safe to
>>>> mess about with or even create new ones. If this is what you want it seems
>>>> potentially useful, but also easy to live without. This can also be done
>>>> entirely within the Java SDK via conversions, leaving no logical type in
>>>> the portable pipeline.
>>>> >> >>>>>>
>>>> >> >>>>>>
>>>> >> >>>>>> I'm imaging a world where someone defines a PTransform that
>>>> takes a POJO for a constructor, and consumes and produces a POJO, and is
>>>> now usable from Go with no additional work on the PTransform author's
>>>> part.  But maybe I'm thinking about this wrong and the POJO <-> Row
>>>> conversion is part of the @ProcesssElement magic, not encoded in the schema
>>>> itself.
>>>> >> >>>>>
>>>> >> >>>>>
>>>> >> >>>>> The user's output would have to be explicitly schema. They
>>>> would somehow have to tell Beam the infer a schema from the output POJO
>>>> (e.g. one way to do this is to annotate the POJO with the @DefaultSchema
>>>> annotation).  We don't currently magically turn a POJO into a schema unless
>>>> we are asked to do so.
>>>>
>>>
>
> --
> Cheers,
> Gleb
>

Reply via email to